Merge pull request #11393 from gm42/fix/add-stop-network

Go binding: add StopNetwork()
This commit is contained in:
Jingyu Zhou 2024-08-03 10:46:02 -07:00 committed by GitHub
commit 6d323de0ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 110 additions and 38 deletions

View File

@ -49,7 +49,9 @@ func (e Error) Error() string {
// SOMEDAY: these (along with others) should be coming from fdb.options?
var (
errNetworkNotSetup = Error{2008}
errNetworkNotSetup = Error{2008}
errNetworkAlreadySetup = Error{2009} // currently unused
errNetworkCannotBeRestarted = Error{2025} // currently unused
errAPIVersionUnset = Error{2200}
errAPIVersionAlreadySet = Error{2201}

View File

@ -31,11 +31,22 @@ import (
"bytes"
"errors"
"fmt"
"log"
"sync"
"unsafe"
)
var (
// ErrNetworkAlreadyStopped for multiple calls to StopNetwork().
ErrNetworkAlreadyStopped = errors.New("network has already been stopped")
// ErrNetworkIsStopped is returned when attempting to execute a function which needs to interact
// with the network thread while the network thread is no more running.
ErrNetworkIsStopped = errors.New("network is stopped")
// ErrNetworkAlreadyStopped for a too early call to StopNetwork().
ErrNetworkNotStarted = errors.New("network has not been started")
)
// Would put this in futures.go but for the documented issue with
// exports and functions in preamble
// (https://code.google.com/p/go-wiki/wiki/cgo#Global_functions)
@ -108,6 +119,7 @@ func (opt NetworkOptions) setOpt(code int, param []byte) error {
// version is not supported by both the fdb package and the FoundationDB C
// library, an error will be returned. APIVersion must be called prior to any
// other functions in the fdb package.
// This function is safe to be called from multiple goroutines.
//
// Currently, this package supports API versions 200 through 740.
//
@ -192,14 +204,28 @@ func MustGetAPIVersion() int {
}
var apiVersion int
var networkStarted bool
var networkStarted, networkStopped bool
var networkMutex sync.RWMutex
var networkRunning sync.WaitGroup
var openDatabases sync.Map
func startNetwork() error {
// executeWithRunningNetworkThread starts the internal network event loop, if not already done,
// then runs the provided function while network thread is running.
// This function is safe to be called from multiple goroutines.
func executeWithRunningNetworkThread(f func()) error {
networkMutex.RLock()
if networkStopped {
networkMutex.RUnlock()
return ErrNetworkIsStopped
}
if networkStarted {
// network thread is guaranteed to be running while this user-provided function runs
f()
networkMutex.RUnlock()
return nil
}
@ -207,35 +233,67 @@ func startNetwork() error {
networkMutex.RUnlock()
networkMutex.Lock()
defer networkMutex.Unlock()
if networkStarted {
return nil
if networkStopped {
return ErrNetworkIsStopped
}
if e := C.fdb_setup_network(); e != 0 {
return Error{int(e)}
}
go func() {
e := C.fdb_run_network()
if e != 0 {
log.Printf("Unhandled error in FoundationDB network thread: %v (%v)\n", C.GoString(C.fdb_get_error(e)), e)
// check if meanwhile another goroutine started the network thread
if !networkStarted {
if e := C.fdb_setup_network(); e != 0 {
return Error{int(e)}
}
}()
networkStarted = true
networkRunning.Add(1)
go func() {
e := C.fdb_run_network()
networkRunning.Done()
if e != 0 {
panic(fmt.Sprintf("Unhandled error in FoundationDB network thread: %v (%v)\n", C.GoString(C.fdb_get_error(e)), e))
}
}()
networkStarted = true
}
// network thread is guaranteed to be running while this user-provided function runs
f()
return nil
}
// Deprecated: the network is started automatically when a database is opened.
// StartNetwork initializes the FoundationDB client networking engine. StartNetwork
// must not be called more than once.
// StartNetwork does nothing, but it will ensure that the API version is set and return an error otherwise.
func StartNetwork() error {
if apiVersion == 0 {
return errAPIVersionUnset
}
return startNetwork()
return nil
}
// StopNetwork signals the internal network event loop to terminate and waits for its termination.
// This function is safe to be called from multiple goroutines.
// This function returns an error if network has not yet started or if network has already been stopped.
// See also: https://github.com/apple/foundationdb/issues/3015
func StopNetwork() error {
networkMutex.Lock()
defer networkMutex.Unlock()
if !networkStarted {
return ErrNetworkNotStarted
}
if networkStopped {
return ErrNetworkAlreadyStopped
}
C.fdb_stop_network()
networkRunning.Wait()
networkStopped = true
return nil
}
// DefaultClusterFile should be passed to fdb.Open to allow the FoundationDB C
@ -272,10 +330,6 @@ func MustOpenDefault() Database {
// the multi-version client API must be used.
// Caller must call Close() to release resources.
func OpenDatabase(clusterFile string) (Database, error) {
if err := ensureNetworkIsStarted(); err != nil {
return Database{}, err
}
var db Database
var okDb bool
anyy, exist := openDatabases.Load(clusterFile)
@ -291,15 +345,6 @@ func OpenDatabase(clusterFile string) (Database, error) {
return db, nil
}
// ensureNetworkIsStarted starts the network if not already done and ensures that the API version is set.
func ensureNetworkIsStarted() error {
if apiVersion == 0 {
return errAPIVersionUnset
}
return startNetwork()
}
// MustOpenDatabase is like OpenDatabase but panics if the default database cannot
// be opened.
func MustOpenDatabase(clusterFile string) Database {
@ -332,6 +377,10 @@ func MustOpen(clusterFile string, dbName []byte) Database {
// createDatabase is the internal function used to create a database.
// Caller must call Close() to release resources.
func createDatabase(clusterFile string) (Database, error) {
if apiVersion == 0 {
return Database{}, errAPIVersionUnset
}
var cf *C.char
if len(clusterFile) != 0 {
@ -340,8 +389,16 @@ func createDatabase(clusterFile string) (Database, error) {
}
var outdb *C.FDBDatabase
if err := C.fdb_create_database(cf, &outdb); err != 0 {
return Database{}, Error{int(err)}
var createErr error
if err := executeWithRunningNetworkThread(func() {
if err := C.fdb_create_database(cf, &outdb); err != 0 {
createErr = Error{int(err)}
}
}); err != nil {
return Database{}, err
}
if createErr != nil {
return Database{}, createErr
}
db := &database{outdb}
@ -354,8 +411,8 @@ func createDatabase(clusterFile string) (Database, error) {
// to the database only for a short time e.g. to test different connection strings.
// Caller must call Close() to release resources.
func OpenWithConnectionString(connectionString string) (Database, error) {
if err := ensureNetworkIsStarted(); err != nil {
return Database{}, err
if apiVersion == 0 {
return Database{}, errAPIVersionUnset
}
var cf *C.char
@ -368,8 +425,16 @@ func OpenWithConnectionString(connectionString string) (Database, error) {
defer C.free(unsafe.Pointer(cf))
var outdb *C.FDBDatabase
if err := C.fdb_create_database_from_connection_string(cf, &outdb); err != 0 {
return Database{}, Error{int(err)}
var createErr error
if err := executeWithRunningNetworkThread(func() {
if err := C.fdb_create_database_from_connection_string(cf, &outdb); err != 0 {
createErr = Error{int(err)}
}
}); err != nil {
return Database{}, err
}
if createErr != nil {
return Database{}, createErr
}
db := &database{outdb}
@ -380,6 +445,7 @@ func OpenWithConnectionString(connectionString string) (Database, error) {
// Deprecated: Use OpenDatabase instead.
// CreateCluster returns a cluster handle to the FoundationDB cluster identified
// by the provided cluster file.
// This function is safe to be called from multiple goroutines.
func CreateCluster(clusterFile string) (Cluster, error) {
networkMutex.Lock()
defer networkMutex.Unlock()
@ -392,6 +458,10 @@ func CreateCluster(clusterFile string) (Cluster, error) {
return Cluster{}, errNetworkNotSetup
}
if networkStopped {
return Cluster{}, ErrNetworkIsStopped
}
return Cluster{clusterFile}, nil
}