Add support in the fdb-kubernetes-monitor to read node labels (#11431)

* Add support in the fdb-kubernetes-monitor to read node labels
This commit is contained in:
Johannes Scheuermann 2024-06-14 07:41:48 +02:00 committed by GitHub
parent 1ab7a33bcf
commit d2dd3bce5a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 847 additions and 361 deletions

View File

@ -2,7 +2,7 @@
//
// This source file is part of the FoundationDB open source project
//
// Copyright 2021 Apple Inc. and the FoundationDB project authors
// Copyright 2021-2024 Apple Inc. and the FoundationDB project authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -95,8 +95,7 @@ const (
IPListArgumentType = "IPList"
)
// GenerateArgument processes an argument and generates its string
// representation.
// GenerateArgument processes an argument and generates its string representation.
func (argument Argument) GenerateArgument(processNumber int, env map[string]string) (string, error) {
switch argument.ArgumentType {
case "":
@ -134,9 +133,11 @@ func (argument Argument) LookupEnv(env map[string]string) (string, error) {
if env != nil {
value, present = env[argument.Source]
}
if !present {
value, present = os.LookupEnv(argument.Source)
}
if !present {
return "", fmt.Errorf("missing environment variable %s", argument.Source)
}
@ -163,11 +164,11 @@ func (argument Argument) LookupEnv(env map[string]string) (string, error) {
}
return "", fmt.Errorf("could not find IP with family %d", argument.IPFamily)
}
return value, nil
}
// GenerateArguments interprets the arguments in the process configuration and
// generates a command invocation.
// GenerateArguments interprets the arguments in the process configuration and generates a command invocation.
func (configuration *ProcessConfiguration) GenerateArguments(processNumber int, env map[string]string) ([]string, error) {
results := make([]string, 0, len(configuration.Arguments)+1)
if configuration.BinaryPath != "" {

View File

@ -2,7 +2,7 @@
//
// This source file is part of the FoundationDB open source project
//
// Copyright 2021 Apple Inc. and the FoundationDB project authors
// Copyright 2021-2024 Apple Inc. and the FoundationDB project authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -22,314 +22,227 @@ package api
import (
"encoding/json"
"os"
"reflect"
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func loadConfigFromFile(path string) (*ProcessConfiguration, error) {
func loadConfigFromFile(path string) *ProcessConfiguration {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
Expect(err).NotTo(HaveOccurred())
defer func() {
Expect(file.Close()).NotTo(HaveOccurred())
}()
decoder := json.NewDecoder(file)
config := &ProcessConfiguration{}
err = decoder.Decode(config)
if err != nil {
return nil, err
}
return config, nil
Expect(decoder.Decode(config)).NotTo(HaveOccurred())
return config
}
func TestGeneratingArgumentsForDefaultConfig(t *testing.T) {
config, err := loadConfigFromFile(".testdata/default_config.json")
if err != nil {
t.Error(err)
return
}
var _ = Describe("Testing FDB Kubernetes Monitor API", func() {
When("generating arguments for default config", func() {
var arguments []string
arguments, err := config.GenerateArguments(1, map[string]string{
"FDB_PUBLIC_IP": "10.0.0.1",
"FDB_POD_IP": "192.168.0.1",
"FDB_ZONE_ID": "zone1",
"FDB_INSTANCE_ID": "storage-1",
BeforeEach(func() {
var err error
config := loadConfigFromFile(".testdata/default_config.json")
arguments, err = config.GenerateArguments(1, map[string]string{
"FDB_PUBLIC_IP": "10.0.0.1",
"FDB_POD_IP": "192.168.0.1",
"FDB_ZONE_ID": "zone1",
"FDB_INSTANCE_ID": "storage-1",
})
Expect(err).NotTo(HaveOccurred())
})
It("should generate the expected arguments", func() {
Expect(arguments).To(HaveExactElements([]string{
"--cluster-file", ".testdata/fdb.cluster",
"--public-address", "10.0.0.1:4501", "--listen-address", "192.168.0.1:4501",
"--datadir", ".testdata/data/1", "--class", "storage",
"--locality-zoneid", "zone1", "--locality-instance-id", "storage-1",
"--locality-process-id", "storage-1-1",
}))
})
})
if err != nil {
t.Error(err)
return
}
expectedArguments := []string{
"--cluster-file", ".testdata/fdb.cluster",
"--public-address", "10.0.0.1:4501", "--listen-address", "192.168.0.1:4501",
"--datadir", ".testdata/data/1", "--class", "storage",
"--locality-zoneid", "zone1", "--locality-instance-id", "storage-1",
"--locality-process-id", "storage-1-1",
}
When("generating arguments for default config with a binary path specified", func() {
var arguments []string
if !reflect.DeepEqual(arguments, expectedArguments) {
t.Logf("Expected arguments %v, but got arguments %v", expectedArguments, arguments)
t.Fail()
}
BeforeEach(func() {
var err error
config := loadConfigFromFile(".testdata/default_config.json")
Expect(err).NotTo(HaveOccurred())
config.BinaryPath = "/usr/bin/fdbserver"
config.BinaryPath = "/usr/bin/fdbserver"
arguments, err = config.GenerateArguments(1, map[string]string{
"FDB_PUBLIC_IP": "10.0.0.1",
"FDB_POD_IP": "192.168.0.1",
"FDB_ZONE_ID": "zone1",
"FDB_INSTANCE_ID": "storage-1",
})
Expect(err).NotTo(HaveOccurred())
})
arguments, err = config.GenerateArguments(1, map[string]string{
"FDB_PUBLIC_IP": "10.0.0.1",
"FDB_POD_IP": "192.168.0.1",
"FDB_ZONE_ID": "zone1",
"FDB_INSTANCE_ID": "storage-1",
It("should generate the expected arguments", func() {
Expect(arguments).To(HaveExactElements([]string{
"/usr/bin/fdbserver",
"--cluster-file", ".testdata/fdb.cluster",
"--public-address", "10.0.0.1:4501", "--listen-address", "192.168.0.1:4501",
"--datadir", ".testdata/data/1", "--class", "storage",
"--locality-zoneid", "zone1", "--locality-instance-id", "storage-1",
"--locality-process-id", "storage-1-1",
}))
})
})
if err != nil {
t.Error(err)
return
}
expectedArguments = []string{
"/usr/bin/fdbserver",
"--cluster-file", ".testdata/fdb.cluster",
"--public-address", "10.0.0.1:4501", "--listen-address", "192.168.0.1:4501",
"--datadir", ".testdata/data/1", "--class", "storage",
"--locality-zoneid", "zone1", "--locality-instance-id", "storage-1",
"--locality-process-id", "storage-1-1",
}
When("generating arguments for environment variable", func() {
var argument string
var err error
var env map[string]string
if !reflect.DeepEqual(arguments, expectedArguments) {
t.Logf("Expected arguments %v, but got arguments %v", expectedArguments, arguments)
t.Fail()
}
}
JustBeforeEach(func() {
testArgument := Argument{ArgumentType: EnvironmentArgumentType, Source: "FDB_ZONE_ID"}
argument, err = testArgument.GenerateArgument(1, env)
})
func TestGeneratingArgumentForEnvironmentVariable(t *testing.T) {
argument := Argument{ArgumentType: EnvironmentArgumentType, Source: "FDB_ZONE_ID"}
When("the env variable is present", func() {
BeforeEach(func() {
env = map[string]string{"FDB_ZONE_ID": "zone1", "FDB_MACHINE_ID": "machine1"}
})
result, err := argument.GenerateArgument(1, map[string]string{"FDB_ZONE_ID": "zone1", "FDB_MACHINE_ID": "machine1"})
if err != nil {
t.Error(err)
return
}
if result != "zone1" {
t.Logf("Expected result zone1, but got result %v", result)
t.Fail()
return
}
It("should generate the expected arguments", func() {
Expect(err).NotTo(HaveOccurred())
Expect(argument).To(Equal("zone1"))
})
})
_, err = argument.GenerateArgument(1, map[string]string{"FDB_MACHINE_ID": "machine1"})
if err == nil {
t.Logf("Expected error result, but did not get an error")
t.Fail()
return
}
expectedError := "missing environment variable FDB_ZONE_ID"
if err.Error() != expectedError {
t.Logf("Expected error %s, but got error %s", expectedError, err)
t.Fail()
return
}
}
When("the env variable is absent", func() {
BeforeEach(func() {
env = map[string]string{"FDB_MACHINE_ID": "machine1"}
})
func TestGeneratingArgumentForIPList(t *testing.T) {
argument := Argument{ArgumentType: IPListArgumentType, Source: "FDB_PUBLIC_IP", IPFamily: 4}
It("should generate the expected arguments", func() {
Expect(err).To(HaveOccurred())
})
})
result, err := argument.GenerateArgument(1, map[string]string{"FDB_PUBLIC_IP": "127.0.0.1,::1"})
if err != nil {
t.Error(err)
return
}
if result != "127.0.0.1" {
t.Logf("Expected result 127.0.0.1, but got result %v", result)
t.Fail()
return
}
})
result, err = argument.GenerateArgument(1, map[string]string{"FDB_PUBLIC_IP": "::1,127.0.0.1"})
if err != nil {
t.Error(err)
return
}
if result != "127.0.0.1" {
t.Logf("Expected result 127.0.0.1, but got result %v", result)
t.Fail()
return
}
When("generating arguments for IPList arguments", func() {
var argument string
var err error
var env map[string]string
var IPFamily int
argument.IPFamily = 6
JustBeforeEach(func() {
testArgument := Argument{ArgumentType: IPListArgumentType, Source: "FDB_PUBLIC_IP", IPFamily: IPFamily}
argument, err = testArgument.GenerateArgument(1, env)
})
result, err = argument.GenerateArgument(1, map[string]string{"FDB_PUBLIC_IP": "127.0.0.1,::1"})
if err != nil {
t.Error(err)
return
}
if result != "::1" {
t.Logf("Expected result ::1, but got result %v", result)
t.Fail()
return
}
When("using IP Family 4", func() {
BeforeEach(func() {
IPFamily = 4
})
result, err = argument.GenerateArgument(1, map[string]string{"FDB_PUBLIC_IP": "::1,127.0.0.1"})
if err != nil {
t.Error(err)
return
}
if result != "::1" {
t.Logf("Expected result ::1, but got result %v", result)
t.Fail()
return
}
When("the env variable is present and has one address with the right IP family", func() {
BeforeEach(func() {
env = map[string]string{"FDB_PUBLIC_IP": "127.0.0.1,::1"}
})
result, err = argument.GenerateArgument(1, map[string]string{"FDB_PUBLIC_IP": "bad,::1"})
if err != nil {
t.Error(err)
return
}
if result != "::1" {
t.Logf("Expected result ::1, but got result %v", result)
t.Fail()
return
}
It("should generate the expected arguments", func() {
Expect(err).NotTo(HaveOccurred())
Expect(argument).To(Equal("127.0.0.1"))
})
})
_, err = argument.GenerateArgument(1, map[string]string{"FDB_PUBLIC_IP": "127.0.0.1"})
if err == nil {
t.Logf("Expected error, but did not get an error")
t.Fail()
return
}
expectedError := "could not find IP with family 6"
if err.Error() != expectedError {
t.Logf("Expected error %s, but got error %s", expectedError, err.Error())
t.Fail()
return
}
When("the env variable is present and has one address with the right IP family with IPv6 first", func() {
BeforeEach(func() {
env = map[string]string{"FDB_PUBLIC_IP": "::1,127.0.0.1"}
})
argument.IPFamily = 5
It("should generate the expected arguments", func() {
Expect(err).NotTo(HaveOccurred())
Expect(argument).To(Equal("127.0.0.1"))
})
})
_, err = argument.GenerateArgument(1, map[string]string{"FDB_PUBLIC_IP": "127.0.0.1"})
if err == nil {
t.Logf("Expected error, but did not get an error")
t.Fail()
return
}
expectedError = "unsupported IP family 5"
if err.Error() != expectedError {
t.Logf("Expected error %s, but got error %s", expectedError, err.Error())
t.Fail()
return
}
}
When("no IPv4 address is present", func() {
BeforeEach(func() {
env = map[string]string{"FDB_PUBLIC_IP": "::1"}
})
func TestLookupEnvForEnvironmentVariable(t *testing.T) {
argument := Argument{ArgumentType: EnvironmentArgumentType, Source: "FDB_ZONE_ID"}
It("should return an error", func() {
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("could not find IP with family 4"))
Expect(argument).To(BeEmpty())
})
})
})
result, err := argument.LookupEnv(map[string]string{"FDB_ZONE_ID": "zone1", "FDB_MACHINE_ID": "machine1"})
if err != nil {
t.Error(err)
return
}
if result != "zone1" {
t.Logf("Expected result zone1, but got result %v", result)
t.Fail()
return
}
When("using IP Family 6", func() {
BeforeEach(func() {
IPFamily = 6
})
_, err = argument.LookupEnv(map[string]string{"FDB_MACHINE_ID": "machine1"})
if err == nil {
t.Logf("Expected error result, but did not get an error")
t.Fail()
return
}
expectedError := "missing environment variable FDB_ZONE_ID"
if err.Error() != expectedError {
t.Logf("Expected error %s, but got error %s", expectedError, err)
t.Fail()
return
}
}
When("the env variable is present and has one address with the right IP family", func() {
BeforeEach(func() {
env = map[string]string{"FDB_PUBLIC_IP": "127.0.0.1,::1"}
})
func TestLookupEnvForIPList(t *testing.T) {
argument := Argument{ArgumentType: IPListArgumentType, Source: "FDB_PUBLIC_IP", IPFamily: 4}
It("should generate the expected arguments", func() {
Expect(err).NotTo(HaveOccurred())
Expect(argument).To(Equal("::1"))
})
})
result, err := argument.LookupEnv(map[string]string{"FDB_PUBLIC_IP": "127.0.0.1,::1"})
if err != nil {
t.Error(err)
return
}
if result != "127.0.0.1" {
t.Logf("Expected result 127.0.0.1, but got result %v", result)
t.Fail()
return
}
When("the env variable is present and has one address with the right IP family with IPv6 first", func() {
BeforeEach(func() {
result, err = argument.LookupEnv(map[string]string{"FDB_PUBLIC_IP": "::1,127.0.0.1"})
if err != nil {
t.Error(err)
return
}
if result != "127.0.0.1" {
t.Logf("Expected result 127.0.0.1, but got result %v", result)
t.Fail()
return
}
env = map[string]string{"FDB_PUBLIC_IP": "::1,127.0.0.1"}
})
argument.IPFamily = 6
It("should generate the expected arguments", func() {
Expect(err).NotTo(HaveOccurred())
Expect(argument).To(Equal("::1"))
})
})
result, err = argument.LookupEnv(map[string]string{"FDB_PUBLIC_IP": "127.0.0.1,::1"})
if err != nil {
t.Error(err)
return
}
if result != "::1" {
t.Logf("Expected result ::1, but got result %v", result)
t.Fail()
return
}
When("a bad address is present", func() {
BeforeEach(func() {
env = map[string]string{"FDB_PUBLIC_IP": "bad,::1"}
})
result, err = argument.LookupEnv(map[string]string{"FDB_PUBLIC_IP": "::1,127.0.0.1"})
if err != nil {
t.Error(err)
return
}
if result != "::1" {
t.Logf("Expected result ::1, but got result %v", result)
t.Fail()
return
}
It("should generate the expected arguments", func() {
Expect(err).NotTo(HaveOccurred())
Expect(argument).To(Equal("::1"))
})
})
result, err = argument.LookupEnv(map[string]string{"FDB_PUBLIC_IP": "bad,::1"})
if err != nil {
t.Error(err)
return
}
if result != "::1" {
t.Logf("Expected result ::1, but got result %v", result)
t.Fail()
return
}
When("no IPv6 address is present", func() {
BeforeEach(func() {
env = map[string]string{"FDB_PUBLIC_IP": "127.0.0.1"}
})
_, err = argument.LookupEnv(map[string]string{"FDB_PUBLIC_IP": "127.0.0.1"})
if err == nil {
t.Logf("Expected error, but did not get an error")
t.Fail()
return
}
expectedError := "could not find IP with family 6"
if err.Error() != expectedError {
t.Logf("Expected error %s, but got error %s", expectedError, err.Error())
t.Fail()
return
}
It("return an error", func() {
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("could not find IP with family 6"))
Expect(argument).To(BeEmpty())
})
})
})
argument.IPFamily = 5
When("using an invalid IP Family", func() {
BeforeEach(func() {
IPFamily = 5
})
_, err = argument.LookupEnv(map[string]string{"FDB_PUBLIC_IP": "127.0.0.1"})
if err == nil {
t.Logf("Expected error, but did not get an error")
t.Fail()
return
}
expectedError = "unsupported IP family 5"
if err.Error() != expectedError {
t.Logf("Expected error %s, but got error %s", expectedError, err.Error())
t.Fail()
return
}
}
It("should return an error", func() {
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("unsupported IP family 5"))
Expect(argument).To(BeEmpty())
})
})
})
})

View File

@ -0,0 +1,37 @@
/*
* suite_test.go
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2023-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package api
import (
"testing"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)
SetDefaultEventuallyTimeout(10 * time.Second)
RunSpecs(t, "FDB Kubernetes Monitor API")
}

View File

@ -2,7 +2,7 @@
//
// This source file is part of the FoundationDB open source project
//
// Copyright 2021 Apple Inc. and the FoundationDB project authors
// Copyright 2021-2024 Apple Inc. and the FoundationDB project authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -43,6 +43,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect

View File

@ -2,6 +2,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -20,6 +21,8 @@ github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhF
github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
@ -43,6 +46,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -73,8 +77,10 @@ github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
@ -85,6 +91,7 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@ -135,12 +142,14 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
@ -159,6 +168,7 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -218,6 +228,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
@ -246,6 +257,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
@ -263,11 +275,13 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
k8s.io/api v0.26.1 h1:f+SWYiPd/GsiWwVRz+NbFyCgvv75Pk9NK6dlkZgpCRQ=
k8s.io/api v0.26.1/go.mod h1:xd/GBNgR0f707+ATNyPmQ1oyKSgndzXij81FzWGsejg=
k8s.io/apiextensions-apiserver v0.26.1 h1:cB8h1SRk6e/+i3NOrQgSFij1B2S0Y0wDoNl66bn8RMI=
k8s.io/apiextensions-apiserver v0.26.1/go.mod h1:AptjOSXDGuE0JICx/Em15PaoO7buLwTs0dGleIHixSM=
k8s.io/apimachinery v0.26.1 h1:8EZ/eGJL+hY/MYCNwhmDzVqq2lPl3N3Bo8rvweJwXUQ=
k8s.io/apimachinery v0.26.1/go.mod h1:tnPmbONNJ7ByJNz9+n9kMjNP8ON+1qoAIIC70lztu74=
k8s.io/client-go v0.26.1 h1:87CXzYJnAMGaa/IDDfRdhTzxk/wzGZ+/HUQpqgVSZXU=
k8s.io/client-go v0.26.1/go.mod h1:IWNSglg+rQ3OcvDkhY6+QLeasV4OYHDjdqeWkDQZwGE=
k8s.io/component-base v0.26.1 h1:4ahudpeQXHZL5kko+iDHqLj/FSGAEUnSVO0EBbgDd+4=
k8s.io/component-base v0.26.1/go.mod h1:VHrLR0b58oC035w6YQiBSbtsf0ThuSwXP+p5dD/kAWU=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E=

View File

@ -63,8 +63,11 @@ const (
// PodClient is a wrapper around the pod API.
type PodClient struct {
// metadata is the latest metadata that was seen by the fdb-kubernetes-monitor for the according Pod.
metadata *metav1.PartialObjectMetadata
// podMetadata is the latest metadata that was seen by the fdb-kubernetes-monitor for the according Pod.
podMetadata *metav1.PartialObjectMetadata
// nodeMetadata is the latest metadata that was seen by the fdb-kubernetes-monitor for the according node that hosts the Pod.
nodeMetadata *metav1.PartialObjectMetadata
// TimestampFeed is a channel where the pod client will send updates with
// the values from OutdatedConfigMapAnnotation.
@ -77,17 +80,16 @@ type PodClient struct {
client.Client
}
// CreatePodClient creates a new client for working with the pod object.
func CreatePodClient(ctx context.Context, logger logr.Logger) (*PodClient, error) {
func setupCache(namespace string, podName string, nodeName string) (client.WithWatch, cache.Cache, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
return nil, nil, err
}
scheme := runtime.NewScheme()
err = clientgoscheme.AddToScheme(scheme)
if err != nil {
return nil, err
return nil, nil, err
}
// Create the new client for writes. This client will also be used to setup the cache.
@ -95,12 +97,9 @@ func CreatePodClient(ctx context.Context, logger logr.Logger) (*PodClient, error
Scheme: scheme,
})
if err != nil {
return nil, err
return nil, nil, err
}
namespace := os.Getenv("FDB_POD_NAMESPACE")
podName := os.Getenv("FDB_POD_NAME")
internalCache, err := cache.New(config, cache.Options{
Scheme: scheme,
Mapper: internalClient.RESTMapper(),
@ -109,30 +108,59 @@ func CreatePodClient(ctx context.Context, logger logr.Logger) (*PodClient, error
&corev1.Pod{}: {
Field: fields.OneTermEqualSelector(metav1.ObjectNameField, podName),
},
&corev1.Node{}: {
Field: fields.OneTermEqualSelector(metav1.ObjectNameField, nodeName),
},
},
})
if err != nil {
return nil, err
return nil, nil, err
}
// Fetch the informer for the FoundationDBCluster resource
informer, err := internalCache.GetInformer(ctx, &corev1.Pod{})
if err != nil {
return nil, err
}
return internalClient, internalCache, nil
}
// CreatePodClient creates a new client for working with the pod object.
func CreatePodClient(ctx context.Context, logger logr.Logger, enableNodeWatcher bool, setupCache func(string, string, string) (client.WithWatch, cache.Cache, error)) (*PodClient, error) {
namespace := os.Getenv("FDB_POD_NAMESPACE")
podName := os.Getenv("FDB_POD_NAME")
nodeName := os.Getenv("FDB_NODE_NAME")
internalClient, internalCache, err := setupCache(namespace, podName, nodeName)
podClient := &PodClient{
metadata: nil,
podMetadata: nil,
nodeMetadata: nil,
TimestampFeed: make(chan int64, 10),
Logger: logger,
}
// Setup an event handler to make sue we get events for new clusters and directly reload.
_, err = informer.AddEventHandler(podClient)
// Fetch the informer for the Pod resource.
podInformer, err := internalCache.GetInformer(ctx, &corev1.Pod{})
if err != nil {
return nil, err
}
// Setup an event handler to make sure we get events for the Pod and directly reload the information.
_, err = podInformer.AddEventHandler(podClient)
if err != nil {
return nil, err
}
if enableNodeWatcher {
var nodeInformer cache.Informer
// Fetch the informer for the node resource.
nodeInformer, err = internalCache.GetInformer(ctx, &corev1.Node{})
if err != nil {
return nil, err
}
// Setup an event handler to make sure we get events for the node and directly reload the information.
_, err = nodeInformer.AddEventHandler(podClient)
if err != nil {
return nil, err
}
}
// Make sure the internal cache is started.
go func() {
_ = internalCache.Start(ctx)
@ -154,14 +182,27 @@ func CreatePodClient(ctx context.Context, logger logr.Logger) (*PodClient, error
podClient.Client = controllerClient
// Fetch the current metadata before returning the PodClient
currentMetadata := &metav1.PartialObjectMetadata{}
currentMetadata.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod"))
err = podClient.Client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: podName}, currentMetadata)
currentPodMetadata := &metav1.PartialObjectMetadata{}
currentPodMetadata.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod"))
err = podClient.Client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: podName}, currentPodMetadata)
if err != nil {
return nil, err
}
podClient.metadata = currentMetadata
podClient.podMetadata = currentPodMetadata
// Only if the fdb-kubernetes-monitor should update the node information, add the watcher here by fetching the node
// information once during start up.
if enableNodeWatcher {
currentNodeMetadata := &metav1.PartialObjectMetadata{}
currentNodeMetadata.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Node"))
err = podClient.Client.Get(ctx, client.ObjectKey{Name: nodeName}, currentNodeMetadata)
if err != nil {
return nil, err
}
podClient.nodeMetadata = currentNodeMetadata
}
return podClient, nil
}
@ -212,7 +253,7 @@ func (podClient *PodClient) updateFdbClusterTimestampAnnotation() error {
// updateAnnotationsOnPod will update the annotations with the provided annotationChanges. If an annotation exists, it
// will be updated if the annotation is absent it will be added.
func (podClient *PodClient) updateAnnotationsOnPod(annotationChanges map[string]string) error {
annotations := podClient.metadata.Annotations
annotations := podClient.podMetadata.Annotations
if len(annotations) == 0 {
annotations = map[string]string{}
}
@ -227,8 +268,8 @@ func (podClient *PodClient) updateAnnotationsOnPod(annotationChanges map[string]
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: podClient.metadata.Namespace,
Name: podClient.metadata.Name,
Namespace: podClient.podMetadata.Namespace,
Name: podClient.podMetadata.Name,
Annotations: annotations,
},
}, client.Apply, client.FieldOwner("fdb-kubernetes-monitor"), client.ForceOwnership)
@ -236,53 +277,57 @@ func (podClient *PodClient) updateAnnotationsOnPod(annotationChanges map[string]
// OnAdd is called when an object is added.
func (podClient *PodClient) OnAdd(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}
podClient.Logger.Info("Got event for OnAdd", "name", pod.Name, "namespace", pod.Namespace)
podClient.metadata = &metav1.PartialObjectMetadata{
TypeMeta: pod.TypeMeta,
ObjectMeta: pod.ObjectMeta,
switch castedObj := obj.(type) {
case *corev1.Pod:
podClient.Logger.Info("Got event for OnAdd for Pod resource", "name", castedObj.Name, "namespace", castedObj.Namespace)
podClient.podMetadata = &metav1.PartialObjectMetadata{
TypeMeta: castedObj.TypeMeta,
ObjectMeta: castedObj.ObjectMeta,
}
case *corev1.Node:
podClient.Logger.Info("Got event for OnAdd for Node resource", "name", castedObj.Name)
podClient.nodeMetadata = &metav1.PartialObjectMetadata{
TypeMeta: castedObj.TypeMeta,
ObjectMeta: castedObj.ObjectMeta,
}
}
}
// OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
func (podClient *PodClient) OnUpdate(_, newObj interface{}) {
pod, ok := newObj.(*corev1.Pod)
if !ok {
return
switch castedObj := newObj.(type) {
case *corev1.Pod:
podClient.Logger.Info("Got event for OnUpdate for Pod resource", "name", castedObj.Name, "namespace", castedObj.Namespace, "generation", castedObj.Generation)
podClient.podMetadata = &metav1.PartialObjectMetadata{
TypeMeta: castedObj.TypeMeta,
ObjectMeta: castedObj.ObjectMeta,
}
if podClient.podMetadata.Annotations == nil {
return
}
annotation := podClient.podMetadata.Annotations[OutdatedConfigMapAnnotation]
if annotation == "" {
return
}
timestamp, err := strconv.ParseInt(annotation, 10, 64)
if err != nil {
podClient.Logger.Error(err, "Error parsing annotation", "key", OutdatedConfigMapAnnotation, "rawAnnotation", annotation)
return
}
podClient.TimestampFeed <- timestamp
case *corev1.Node:
podClient.Logger.Info("Got event for OnUpdate for Node resource", "name", castedObj.Name)
podClient.nodeMetadata = &metav1.PartialObjectMetadata{
TypeMeta: castedObj.TypeMeta,
ObjectMeta: castedObj.ObjectMeta,
}
}
podClient.Logger.Info("Got event for OnUpdate", "name", pod.Name, "namespace", pod.Namespace, "generation", pod.Generation)
podClient.metadata = &metav1.PartialObjectMetadata{
TypeMeta: pod.TypeMeta,
ObjectMeta: pod.ObjectMeta,
}
if podClient.metadata == nil {
return
}
annotation := podClient.metadata.Annotations[OutdatedConfigMapAnnotation]
if annotation == "" {
return
}
timestamp, err := strconv.ParseInt(annotation, 10, 64)
if err != nil {
podClient.Logger.Error(err, "Error parsing annotation", "key", OutdatedConfigMapAnnotation, "rawAnnotation", annotation)
return
}
podClient.TimestampFeed <- timestamp
}
// OnDelete will get the final state of the item if it is known, otherwise
@ -290,15 +335,12 @@ func (podClient *PodClient) OnUpdate(_, newObj interface{}) {
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
func (podClient *PodClient) OnDelete(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return
}
podClient.Logger.Info("Got event for OnDelete", "name", pod.Name, "namespace", pod.Namespace)
podClient.metadata = &metav1.PartialObjectMetadata{
TypeMeta: pod.TypeMeta,
ObjectMeta: pod.ObjectMeta,
switch castedObj := obj.(type) {
case *corev1.Pod:
podClient.Logger.Info("Got event for OnDelete for Pod resource", "name", castedObj.Name, "namespace", castedObj.Namespace)
podClient.podMetadata = nil
case *corev1.Node:
podClient.Logger.Info("Got event for OnDelete for Node resource", "name", castedObj.Name)
podClient.nodeMetadata = nil
}
}

View File

@ -0,0 +1,393 @@
// kubernetes_test.go
//
// This source file is part of the FoundationDB open source project
//
// Copyright 2023-2024 Apple Inc. and the FoundationDB project authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package main
import (
"context"
"strconv"
"time"
"github.com/apple/foundationdb/fdbkubernetesmonitor/api"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
)
var _ = Describe("Testing FDB Pod client", func() {
var enableNodeWatcher bool
var fakeClient client.WithWatch
var podClient *PodClient
var namespace, podName, nodeName string
var internalCache *informertest.FakeInformers
BeforeEach(func() {
scheme := runtime.NewScheme()
Expect(clientgoscheme.AddToScheme(scheme)).NotTo(HaveOccurred())
fakeClient = fake.NewClientBuilder().WithScheme(scheme).Build()
namespace = "fdb-testing"
podName = "storage-1"
nodeName = "node1"
GinkgoT().Setenv("FDB_POD_NAMESPACE", namespace)
GinkgoT().Setenv("FDB_POD_NAME", podName)
GinkgoT().Setenv("FDB_NODE_NAME", nodeName)
internalCache = &informertest.FakeInformers{}
internalCache.Scheme = fakeClient.Scheme()
})
When("the PodClient was started", func() {
JustBeforeEach(func() {
var err error
podClient, err = CreatePodClient(context.Background(), GinkgoLogr, enableNodeWatcher, func(fncNamespace string, fncPodName string, fncNodeName string) (client.WithWatch, cache.Cache, error) {
Expect(fncNamespace).To(Equal(namespace))
Expect(fncPodName).To(Equal(podName))
Expect(fncNodeName).To(Equal(nodeName))
return fakeClient, internalCache, nil
})
Expect(err).NotTo(HaveOccurred())
})
When("the node watch feature is disabled", func() {
BeforeEach(func() {
enableNodeWatcher = false
})
It("should have the metadata for the pod but not the node", func() {
Expect(podClient.podMetadata).NotTo(BeNil())
Expect(podClient.nodeMetadata).To(BeNil())
Expect(internalCache.InformersByGVK).To(HaveLen(1))
})
})
When("the node watch feature is enabled", func() {
BeforeEach(func() {
enableNodeWatcher = true
})
It("should have the metadata for the pod and node", func() {
Expect(podClient.podMetadata).NotTo(BeNil())
Expect(podClient.nodeMetadata).NotTo(BeNil())
Expect(internalCache.InformersByGVK).To(HaveLen(2))
})
})
})
When("the PodClient handles events", func() {
BeforeEach(func() {
var err error
podClient, err = CreatePodClient(context.Background(), GinkgoLogr, enableNodeWatcher, func(fncNamespace string, fncPodName string, fncNodeName string) (client.WithWatch, cache.Cache, error) {
Expect(fncNamespace).To(Equal(namespace))
Expect(fncPodName).To(Equal(podName))
Expect(fncNodeName).To(Equal(nodeName))
return fakeClient, internalCache, nil
})
Expect(err).NotTo(HaveOccurred())
})
When("events for the pod are received", func() {
var fakeInformer *controllertest.FakeInformer
var pod *corev1.Pod
BeforeEach(func() {
pod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{
"testing": "testing",
},
},
}
var err error
fakeInformer, err = internalCache.FakeInformerFor(pod)
Expect(err).NotTo(HaveOccurred())
})
When("an AddEvent is handled", func() {
BeforeEach(func() {
fakeInformer.Add(pod)
})
It("should update the pod information", func() {
Expect(podClient.podMetadata).NotTo(BeNil())
Expect(podClient.podMetadata.Name).To(Equal(podName))
Expect(podClient.podMetadata.Namespace).To(Equal(namespace))
Expect(podClient.podMetadata.Labels).To(Equal(map[string]string{
"testing": "testing",
}))
})
})
When("an UpdateEvent is handled", func() {
BeforeEach(func() {
fakeInformer.Update(nil, pod)
})
It("should update the pod information", func() {
Expect(podClient.podMetadata).NotTo(BeNil())
Expect(podClient.podMetadata.Name).To(Equal(podName))
Expect(podClient.podMetadata.Namespace).To(Equal(namespace))
Expect(podClient.podMetadata.Labels).To(Equal(map[string]string{
"testing": "testing",
}))
Expect(podClient.TimestampFeed).NotTo(Receive())
})
})
When("an UpdateEvent is handled that updates the OutdatedConfigMapAnnotation", func() {
var timestamp int64
BeforeEach(func() {
timestamp = time.Now().Unix()
pod.Annotations = map[string]string{
OutdatedConfigMapAnnotation: strconv.FormatInt(timestamp, 10),
}
fakeInformer.Update(nil, pod)
})
It("should update the pod information and receive an event", func() {
Expect(podClient.podMetadata).NotTo(BeNil())
Expect(podClient.podMetadata.Name).To(Equal(podName))
Expect(podClient.podMetadata.Namespace).To(Equal(namespace))
Expect(podClient.podMetadata.Labels).To(Equal(map[string]string{
"testing": "testing",
}))
Expect(podClient.TimestampFeed).To(Receive(&timestamp))
})
})
When("an UpdateEvent is handled that updates the OutdatedConfigMapAnnotation with a bad value", func() {
BeforeEach(func() {
pod.Annotations = map[string]string{
OutdatedConfigMapAnnotation: "boom!",
}
fakeInformer.Update(nil, pod)
})
It("should update the Pod information", func() {
Expect(podClient.podMetadata).NotTo(BeNil())
Expect(podClient.podMetadata.Name).To(Equal(podName))
Expect(podClient.podMetadata.Namespace).To(Equal(namespace))
Expect(podClient.podMetadata.Labels).To(Equal(map[string]string{
"testing": "testing",
}))
Expect(podClient.TimestampFeed).NotTo(Receive())
})
})
When("a DeleteEvent is handled", func() {
BeforeEach(func() {
fakeInformer.Delete(pod)
})
It("should remove the pod information", func() {
Expect(podClient.podMetadata).To(BeNil())
})
})
})
When("events for the node are received", func() {
var fakeInformer *controllertest.FakeInformer
var node *corev1.Node
BeforeEach(func() {
node = &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{
"testing": "testing",
},
},
}
var err error
fakeInformer, err = internalCache.FakeInformerFor(node)
Expect(err).NotTo(HaveOccurred())
})
When("an AddEvent is handled", func() {
BeforeEach(func() {
fakeInformer.Add(node)
})
It("should update the node information", func() {
Expect(podClient.nodeMetadata).NotTo(BeNil())
Expect(podClient.nodeMetadata.Name).To(Equal(podName))
Expect(podClient.nodeMetadata.Namespace).To(Equal(namespace))
Expect(podClient.nodeMetadata.Labels).To(Equal(map[string]string{
"testing": "testing",
}))
})
})
When("an UpdateEvent is handled", func() {
BeforeEach(func() {
fakeInformer.Update(nil, node)
})
It("should update the node information", func() {
Expect(podClient.nodeMetadata).NotTo(BeNil())
Expect(podClient.nodeMetadata.Name).To(Equal(podName))
Expect(podClient.nodeMetadata.Namespace).To(Equal(namespace))
Expect(podClient.nodeMetadata.Labels).To(Equal(map[string]string{
"testing": "testing",
}))
})
})
When("a DeleteEvent is handled", func() {
BeforeEach(func() {
fakeInformer.Delete(node)
})
It("should remove the node information", func() {
Expect(podClient.nodeMetadata).To(BeNil())
})
})
})
})
When("the PodClient should update the annotations", func() {
var monitor *Monitor
JustBeforeEach(func() {
var err error
podClient, err = CreatePodClient(context.Background(), GinkgoLogr, enableNodeWatcher, func(fncNamespace string, fncPodName string, fncNodeName string) (client.WithWatch, cache.Cache, error) {
Expect(fncNamespace).To(Equal(namespace))
Expect(fncPodName).To(Equal(podName))
Expect(fncNodeName).To(Equal(nodeName))
return fakeClient, internalCache, nil
})
Expect(err).NotTo(HaveOccurred())
// We have to set this value here as we haven't received any update yet and the fake implementation is not
// passing through the fake client.
podClient.podMetadata = &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
},
}
// Make sure to create the Pod in the fake client otherwise it cannot be patched.
Expect(fakeClient.Create(context.Background(), &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
},
})).NotTo(HaveOccurred())
// Execute the update annotations.
Expect(podClient.UpdateAnnotations(monitor)).NotTo(HaveOccurred())
})
When("no additional env variables are set", func() {
BeforeEach(func() {
monitor = &Monitor{
ActiveConfiguration: &api.ProcessConfiguration{
BinaryPath: "/usr/bin",
},
}
})
It("should update the annotations", func() {
pod := &corev1.Pod{}
Expect(fakeClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: podName}, pod)).NotTo(HaveOccurred())
Expect(pod.Annotations).To(HaveKeyWithValue(CurrentConfigurationAnnotation, ""))
Expect(pod.Annotations).To(HaveKeyWithValue(EnvironmentAnnotation, "{\"BINARY_DIR\":\"/usr\"}"))
})
})
When("one flat additional env variable is set", func() {
BeforeEach(func() {
GinkgoT().Setenv("TEST", "test-value")
monitor = &Monitor{
ActiveConfiguration: &api.ProcessConfiguration{
BinaryPath: "/usr/bin",
Arguments: []api.Argument{
{
ArgumentType: EnvironmentAnnotation,
Source: "TEST",
},
},
},
}
})
It("should update the annotations", func() {
pod := &corev1.Pod{}
Expect(fakeClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: podName}, pod)).NotTo(HaveOccurred())
Expect(pod.Annotations).To(HaveKeyWithValue(CurrentConfigurationAnnotation, ""))
Expect(pod.Annotations).To(HaveKeyWithValue(EnvironmentAnnotation, "{\"BINARY_DIR\":\"/usr\",\"TEST\":\"test-value\"}"))
})
})
When("one nested flat additional env variable is set", func() {
BeforeEach(func() {
GinkgoT().Setenv("TEST", "test-value")
monitor = &Monitor{
ActiveConfiguration: &api.ProcessConfiguration{
BinaryPath: "/usr/bin",
Arguments: []api.Argument{
{
ArgumentType: api.ConcatenateArgumentType,
Values: []api.Argument{
{
ArgumentType: api.EnvironmentArgumentType,
Source: "TEST",
},
},
},
},
},
}
})
It("should update the annotations", func() {
pod := &corev1.Pod{}
Expect(fakeClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: podName}, pod)).NotTo(HaveOccurred())
Expect(pod.Annotations).To(HaveKeyWithValue(CurrentConfigurationAnnotation, ""))
Expect(pod.Annotations).To(HaveKeyWithValue(EnvironmentAnnotation, "{\"BINARY_DIR\":\"/usr\",\"TEST\":\"test-value\"}"))
})
})
})
})

View File

@ -49,6 +49,7 @@ var (
listenAddress string
processCount int
enablePprof bool
enableNodeWatch bool
)
type executionMode string
@ -101,6 +102,7 @@ func main() {
pflag.IntVar(&processCount, "process-count", 1, "The number of processes to start")
pflag.BoolVar(&enablePprof, "enable-pprof", false, "Enables /debug/pprof endpoints on the listen address")
pflag.StringVar(&listenAddress, "listen-address", ":8081", "An address and port to listen on")
pflag.BoolVar(&enableNodeWatch, "enable-node-watch", false, "Enables the fdb-kubernetes-monitor to watch the node resource where the current Pod is running. This can be used to read node labels")
pflag.Parse()
logger := zapr.NewLogger(initLogger(logPath))
@ -124,7 +126,7 @@ func main() {
logger.Error(err, "Error loading additional environment")
os.Exit(1)
}
StartMonitor(context.Background(), logger, path.Join(inputDir, monitorConfFile), customEnvironment, processCount, listenAddress, enablePprof, currentContainerVersion)
StartMonitor(context.Background(), logger, path.Join(inputDir, monitorConfFile), customEnvironment, processCount, listenAddress, enablePprof, currentContainerVersion, enableNodeWatch)
case executionModeInit:
err = CopyFiles(logger, outputDir, copyDetails, requiredCopies)
if err != nil {

View File

@ -2,7 +2,7 @@
//
// This source file is part of the FoundationDB open source project
//
// Copyright 2021 Apple Inc. and the FoundationDB project authors
// Copyright 2021-2024 Apple Inc. and the FoundationDB project authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -32,6 +32,7 @@ import (
"os/exec"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"time"
@ -106,8 +107,8 @@ type Monitor struct {
}
// StartMonitor starts the monitor loop.
func StartMonitor(ctx context.Context, logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int, listenAddr string, enableDebug bool, currentContainerVersion string) {
podClient, err := CreatePodClient(ctx, logger)
func StartMonitor(ctx context.Context, logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int, listenAddr string, enableDebug bool, currentContainerVersion string, enableNodeWatcher bool) {
podClient, err := CreatePodClient(ctx, logger, enableNodeWatcher, setupCache)
if err != nil {
logger.Error(err, "could not create Pod client")
os.Exit(1)
@ -161,6 +162,37 @@ func StartMonitor(ctx context.Context, logger logr.Logger, configFile string, cu
monitor.Run()
}
// updateCustomEnvironment will add the node labels and their values to the custom environment map. All the generated
// environment variables will start with NODE_LABEL and "/" and "." will be replaced in the key as "_", e.g. from the
// label "foundationdb.org/testing = awesome" the env variables NODE_LABEL_FOUNDATIONDB_ORG_TESTING = awesome" will be
// generated.
func (monitor *Monitor) updateCustomEnvironmentFromNodeMetadata() {
if monitor.PodClient.nodeMetadata == nil {
return
}
nodeLabels := monitor.PodClient.nodeMetadata.Labels
for key, value := range nodeLabels {
sanitizedKey := strings.ReplaceAll(key, "/", "_")
sanitizedKey = strings.ReplaceAll(sanitizedKey, ".", "_")
envKey := "NODE_LABEL_" + strings.ToUpper(sanitizedKey)
currentValue, ok := monitor.CustomEnvironment[envKey]
if !ok {
monitor.Logger.Info("adding new custom environment variable from node labels", "key", envKey, "value", value)
monitor.CustomEnvironment[envKey] = value
continue
}
if currentValue == value {
continue
}
monitor.Logger.Info("update custom environment variable from node labels", "key", envKey, "newValue", value, "currentValue", currentValue)
monitor.CustomEnvironment[envKey] = value
continue
}
}
// LoadConfiguration loads the latest configuration from the config file.
func (monitor *Monitor) LoadConfiguration() {
file, err := os.Open(monitor.ConfigFile)
@ -168,7 +200,10 @@ func (monitor *Monitor) LoadConfiguration() {
monitor.Logger.Error(err, "Error reading monitor config file", "monitorConfigPath", monitor.ConfigFile)
return
}
defer file.Close()
defer func() {
err := file.Close()
monitor.Logger.Error(err, "Error could not close file", "monitorConfigPath", monitor.ConfigFile)
}()
configuration := &api.ProcessConfiguration{}
configurationBytes, err := io.ReadAll(file)
if err != nil {
@ -192,6 +227,8 @@ func (monitor *Monitor) LoadConfiguration() {
return
}
monitor.updateCustomEnvironmentFromNodeMetadata()
_, err = configuration.GenerateArguments(1, monitor.CustomEnvironment)
if err != nil {
monitor.Logger.Error(err, "Error generating arguments for latest configuration", "configuration", configuration, "binaryPath", configuration.BinaryPath)
@ -375,8 +412,7 @@ func (monitor *Monitor) processRequired(processNumber int) bool {
monitor.Mutex.Lock()
defer monitor.Mutex.Unlock()
logger := monitor.Logger.WithValues("processNumber", processNumber, "area", "processRequired")
runProcesses := pointer.BoolDeref(monitor.ActiveConfiguration.RunServers, true)
if monitor.ProcessCount < processNumber || !runProcesses {
if monitor.ProcessCount < processNumber || !pointer.BoolDeref(monitor.ActiveConfiguration.RunServers, true) {
if monitor.ProcessIDs[processNumber] != 0 {
logger.Info("Terminating run loop")
monitor.ProcessIDs[processNumber] = 0
@ -467,7 +503,7 @@ func (monitor *Monitor) Run() {
}
}
annotations := monitor.PodClient.metadata.Annotations
annotations := monitor.PodClient.podMetadata.Annotations
if len(annotations) > 0 {
delayValue, ok := annotations[DelayShutdownAnnotation]
if ok {

View File

@ -24,9 +24,56 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var _ = Describe("Testing FDB Kubernetes Monitor", func() {
When("updating the custom environment variables from the node metadata", func() {
var monitor *Monitor
BeforeEach(func() {
monitor = &Monitor{
Logger: GinkgoLogr,
CustomEnvironment: map[string]string{
"testing": "testing",
},
PodClient: &PodClient{},
}
})
JustBeforeEach(func() {
monitor.updateCustomEnvironmentFromNodeMetadata()
})
When("no node metadata is present", func() {
It("shouldn't add new entries", func() {
Expect(monitor.CustomEnvironment).To(HaveLen(1))
Expect(monitor.CustomEnvironment).To(HaveKeyWithValue("testing", "testing"))
})
})
When("node metadata is present", func() {
BeforeEach(func() {
monitor.PodClient.nodeMetadata = &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
// Examples are taken from: https://kubernetes.io/docs/reference/labels-annotations-taints/
"topology.kubernetes.io/zone": "us-east-1c",
"kubernetes.io/hostname": "ip-172-20-114-199.ec2.internal",
},
},
}
})
It("should add the new entries", func() {
Expect(monitor.CustomEnvironment).To(HaveLen(3))
Expect(monitor.CustomEnvironment).To(HaveKeyWithValue("testing", "testing"))
Expect(monitor.CustomEnvironment).To(HaveKeyWithValue("NODE_LABEL_TOPOLOGY_KUBERNETES_IO_ZONE", "us-east-1c"))
Expect(monitor.CustomEnvironment).To(HaveKeyWithValue("NODE_LABEL_KUBERNETES_IO_HOSTNAME", "ip-172-20-114-199.ec2.internal"))
})
})
})
DescribeTable("when getting the backoff time", func(errorCount int, expected time.Duration) {
Expect(getBackoffDuration(errorCount)).To(Equal(expected))
},

View File

@ -3,7 +3,7 @@
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2023 Apple Inc. and the FoundationDB project authors
* Copyright 2023-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.