Merge branch 'master' into feature-remote-logs

This commit is contained in:
Evan Tschannen 2018-05-08 13:28:23 -07:00
commit 9f0d244efe
31 changed files with 448 additions and 165 deletions

View File

@ -5,7 +5,7 @@ FDBLibTLS_LDFLAGS += -Wl,-soname,FDBLibTLS.so -Wl,--version-script=FDBLibTLS/FDB
# The plugin isn't a typical library, so it feels more sensible to have a copy
# of it in bin/.
bin/FDBLibTLS.$(DLEXT): lib/libFDBLibTLS.$(DLEXT)
bin/fdb-libressl-plugin.$(DLEXT): lib/libFDBLibTLS.$(DLEXT)
@cp $< $@
TARGETS += bin/FDBLibTLS.$(DLEXT)
TARGETS += bin/fdb-libressl-plugin.$(DLEXT)

View File

@ -15,7 +15,11 @@ ifeq ($(MONO),)
MONO := /usr/bin/mono
endif
MCS := $(shell which dmcs)
DMCS := $(shell which dmcs)
MCS := $(shell which mcs)
ifneq ($(DMCS),)
MCS := $(DMCS)
endif
ifeq ($(MCS),)
MCS := /usr/bin/dmcs
endif

View File

@ -44,7 +44,6 @@ fdb_error_t waitError(FDBFuture *f) {
struct RunResult run(struct ResultSet *rs, FDBDatabase *db, struct RunResult (*func)(struct ResultSet*, FDBTransaction*)) {
FDBTransaction *tr = NULL;
checkError(fdb_database_create_transaction(db, &tr), "create transaction", rs);
fdb_error_t e = fdb_database_create_transaction(db, &tr);
checkError(e, "create transaction", rs);
@ -62,9 +61,11 @@ struct RunResult run(struct ResultSet *rs, FDBDatabase *db, struct RunResult (*f
fdb_error_t retryE = waitError(f);
fdb_future_destroy(f);
if (retryE) {
fdb_transaction_destroy(tr);
return (struct RunResult) {0, retryE};
}
} else {
fdb_transaction_destroy(tr);
return r;
}
}

View File

@ -236,6 +236,7 @@ void runTests(struct ResultSet *rs) {
runTest(&clearRangeGetRange, tr, rs, "C: get range cached values with clear ranges throughput");
runTest(&interleavedSetsGets, tr, rs, "C: interleaved sets and gets on a single key throughput");
fdb_transaction_destroy(tr);
fdb_database_destroy(db);
fdb_stop_network();
}

View File

@ -23,8 +23,32 @@ if [[ "${platform}" == "Darwin" ]] ; then
FDBLIBDIR="${FDBLIBDIR:-/usr/local/lib}"
libfdbc="libfdb_c.dylib"
elif [[ "${platform}" == "Linux" ]] ; then
FDBLIBDIR="${FDBLIBDIR:-/usr/lib}"
libfdbc="libfdb_c.so"
custom_libdir="${FDBLIBDIR:-}"
FDBLIBDIR=""
if [[ -z "${custom_libdir}" ]]; then
search_libdirs=( '/usr/lib' '/usr/lib64' )
else
search_libdirs=( "${custom_libdir}" )
fi
for libdir in "${search_libdirs[@]}" ; do
if [[ -e "${libdir}/${libfdbc}" ]]; then
FDBLIBDIR="${libdir}"
break
fi
done
if [[ -z "${FDBLIBDIR}" ]]; then
echo "The FoundationDB C library could not be found in any of:"
for libdir in "${search_libdirs[@]}" ; do
echo " ${libdir}"
done
echo "Your installation may be incomplete, or you need to set a custom FDBLIBDIR."
let status="${status} + 1"
fi
else
echo "Unsupported platform ${platform}".
echo "At the moment, only macOS and Linux are supported by this script."
@ -197,7 +221,7 @@ else
fi
else
echo "Downloading foundation repository into ${destdir}:"
cmd=( 'git' '-C' "${destdir}" 'clone' '--branch' "release-${FDBVER}" "git@${REMOTE}:${FDBREPO}.git" )
cmd=( 'git' '-C' "${destdir}" 'clone' '--branch' "release-${FDBVER}" "https://${REMOTE}/${FDBREPO}.git" )
echo "${cmd[*]}"
if ! "${cmd[@]}" ; then
@ -272,17 +296,12 @@ else
# Do not install if only downloading
:
elif [[ "${status}" -eq 0 ]] ; then
cgo_cflags="-g -O2 -I${linkpath}/bindings/c"
cgo_ldflags="-g -O2 -L${FDBLIBDIR}"
cgo_cppflags="-I${linkpath}/bindings/c"
cgo_cflags="-g -O2"
cgo_ldflags="-L${FDBLIBDIR}"
fdb_go_path="${REMOTE}/${FDBREPO}/bindings/go/src"
if [[ ! -e "${FDBLIBDIR}/${libfdbc}" ]] ; then
# Just a warning. Don't fail script.
echo
echo "WARNING: The FoundationDB C library was not found within ${FDBLIBDIR}."
echo "Your installation may be incomplete."
echo
elif ! CGO_CFLAGS="${cgo_cflags}" CGO_LDFLAGS="${cgo_ldflags}" go install "${fdb_go_path}/fdb" "${fdb_go_path}/fdb/tuple" "${fdb_go_path}/fdb/subspace" "${fdb_go_path}/fdb/directory" ; then
if ! CGO_CPPFLAGS="${cgo_cppflags}" CGO_CFLAGS="${cgo_cflags}" CGO_LDFLAGS="${cgo_ldflags}" go install "${fdb_go_path}/fdb" "${fdb_go_path}/fdb/tuple" "${fdb_go_path}/fdb/subspace" "${fdb_go_path}/fdb/directory" ; then
let status="${status} + 1"
echo "Could not build FoundationDB go libraries."
fi
@ -295,6 +314,7 @@ else
echo "The FoundationDB go bindings were successfully installed."
echo "To build packages which use the go bindings, you will need to"
echo "set the following environment variables:"
echo " CGO_CPPFLAGS=\"${cgo_cppflags}\""
echo " CGO_CFLAGS=\"${cgo_cflags}\""
echo " CGO_LDFLAGS=\"${cgo_ldflags}\""
fi

View File

@ -25,7 +25,7 @@ Package fdb provides an interface to FoundationDB databases (version 2.0 or high
To build and run programs using this package, you must have an installed copy of
the FoundationDB client libraries (version 2.0.0 or later), available for Linux,
Windows and OS X at https://www.foundationdb.org/downloads/fdb-c/.
Windows and OS X at https://www.foundationdb.org/download/.
This documentation specifically applies to the FoundationDB Go binding. For more
extensive guidance to programming with FoundationDB, as well as API

Binary file not shown.

View File

@ -38,6 +38,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"math"
"github.com/apple/foundationdb/bindings/go/src/fdb"
)
@ -94,6 +95,14 @@ var sizeLimits = []uint64{
1<<(8*8) - 1,
}
func bisectLeft(u uint64) int {
var n int
for sizeLimits[n] < u {
n++
}
return n
}
func adjustFloatBytes(b []byte, encode bool) {
if (encode && b[0]&0x80 != 0x00) || (!encode && b[0]&0x80 == 0x00) {
// Negative numbers: flip all of the bytes.
@ -106,109 +115,128 @@ func adjustFloatBytes(b []byte, encode bool) {
}
}
func encodeBytes(buf *bytes.Buffer, code byte, b []byte) {
buf.WriteByte(code)
buf.Write(bytes.Replace(b, []byte{0x00}, []byte{0x00, 0xFF}, -1))
buf.WriteByte(0x00)
type packer struct {
buf []byte
}
func bisectLeft(u uint64) int {
var n int
for sizeLimits[n] < u {
n++
func (p *packer) putByte(b byte) {
p.buf = append(p.buf, b)
}
func (p *packer) putBytes(b []byte) {
p.buf = append(p.buf, b...)
}
func (p *packer) putBytesNil(b []byte, i int) {
for i >= 0 {
p.putBytes(b[:i+1])
p.putByte(0xFF)
b = b[i+1:]
i = bytes.IndexByte(b, 0x00)
}
return n
p.putBytes(b)
}
func encodeInt(buf *bytes.Buffer, i int64) {
func (p *packer) encodeBytes(code byte, b []byte) {
p.putByte(code)
if i := bytes.IndexByte(b, 0x00); i >= 0 {
p.putBytesNil(b, i)
} else {
p.putBytes(b)
}
p.putByte(0x00)
}
func (p *packer) encodeInt(i int64) {
if i == 0 {
buf.WriteByte(0x14)
p.putByte(0x14)
return
}
var n int
var ibuf bytes.Buffer
var scratch [8]byte
switch {
case i > 0:
n = bisectLeft(uint64(i))
buf.WriteByte(byte(intZeroCode + n))
binary.Write(&ibuf, binary.BigEndian, i)
p.putByte(byte(intZeroCode + n))
binary.BigEndian.PutUint64(scratch[:], uint64(i))
case i < 0:
n = bisectLeft(uint64(-i))
buf.WriteByte(byte(0x14 - n))
binary.Write(&ibuf, binary.BigEndian, int64(sizeLimits[n])+i)
p.putByte(byte(0x14 - n))
offsetEncoded := int64(sizeLimits[n]) + i
binary.BigEndian.PutUint64(scratch[:], uint64(offsetEncoded))
}
buf.Write(ibuf.Bytes()[8-n:])
p.putBytes(scratch[8-n:])
}
func encodeFloat(buf *bytes.Buffer, f float32) {
var ibuf bytes.Buffer
binary.Write(&ibuf, binary.BigEndian, f)
buf.WriteByte(floatCode)
out := ibuf.Bytes()
adjustFloatBytes(out, true)
buf.Write(out)
func (p *packer) encodeFloat(f float32) {
var scratch [4]byte
binary.BigEndian.PutUint32(scratch[:], math.Float32bits(f))
adjustFloatBytes(scratch[:], true)
p.putByte(floatCode)
p.putBytes(scratch[:])
}
func encodeDouble(buf *bytes.Buffer, d float64) {
var ibuf bytes.Buffer
binary.Write(&ibuf, binary.BigEndian, d)
buf.WriteByte(doubleCode)
out := ibuf.Bytes()
adjustFloatBytes(out, true)
buf.Write(out)
func (p *packer) encodeDouble(d float64) {
var scratch [8]byte
binary.BigEndian.PutUint64(scratch[:], math.Float64bits(d))
adjustFloatBytes(scratch[:], true)
p.putByte(doubleCode)
p.putBytes(scratch[:])
}
func encodeUUID(buf *bytes.Buffer, u UUID) {
buf.WriteByte(uuidCode)
buf.Write(u[:])
func (p *packer) encodeUUID(u UUID) {
p.putByte(uuidCode)
p.putBytes(u[:])
}
func encodeTuple(buf *bytes.Buffer, t Tuple, nested bool) {
func (p *packer) encodeTuple(t Tuple, nested bool) {
if nested {
buf.WriteByte(nestedCode)
p.putByte(nestedCode)
}
for i, e := range t {
switch e := e.(type) {
case Tuple:
encodeTuple(buf, e, true)
p.encodeTuple(e, true)
case nil:
buf.WriteByte(nilCode)
p.putByte(nilCode)
if nested {
buf.WriteByte(0xff)
p.putByte(0xff)
}
case int64:
encodeInt(buf, e)
p.encodeInt(e)
case int:
encodeInt(buf, int64(e))
p.encodeInt(int64(e))
case []byte:
encodeBytes(buf, bytesCode, e)
p.encodeBytes(bytesCode, e)
case fdb.KeyConvertible:
encodeBytes(buf, bytesCode, []byte(e.FDBKey()))
p.encodeBytes(bytesCode, []byte(e.FDBKey()))
case string:
encodeBytes(buf, stringCode, []byte(e))
p.encodeBytes(stringCode, []byte(e))
case float32:
encodeFloat(buf, e)
p.encodeFloat(e)
case float64:
encodeDouble(buf, e)
p.encodeDouble(e)
case bool:
if e {
buf.WriteByte(trueCode)
p.putByte(trueCode)
} else {
buf.WriteByte(falseCode)
p.putByte(falseCode)
}
case UUID:
encodeUUID(buf, e)
p.encodeUUID(e)
default:
panic(fmt.Sprintf("unencodable element at index %d (%v, type %T)", i, t[i], t[i]))
}
}
if nested {
buf.WriteByte(0x00)
p.putByte(0x00)
}
}
@ -221,9 +249,9 @@ func encodeTuple(buf *bytes.Buffer, t Tuple, nested bool) {
// call Pack when using a Tuple with a FoundationDB API function that requires a
// key.
func (t Tuple) Pack() []byte {
buf := new(bytes.Buffer)
encodeTuple(buf, t, false)
return buf.Bytes()
p := packer{buf: make([]byte, 0, 64)}
p.encodeTuple(t, false)
return p.buf
}
func findTerminator(b []byte) int {

View File

@ -0,0 +1,120 @@
package tuple
import (
"bytes"
"encoding/gob"
"flag"
"math/rand"
"os"
"testing"
)
var update = flag.Bool("update", false, "update .golden files")
func loadGolden(t *testing.T) (golden map[string][]byte) {
f, err := os.Open("testdata/tuples.golden")
if err != nil {
t.Fatalf("failed to open golden file: %s", err)
}
defer f.Close()
err = gob.NewDecoder(f).Decode(&golden)
if err != nil {
t.Fatalf("failed to decode golden file: %s", err)
}
return
}
func writeGolden(t *testing.T, golden map[string][]byte) {
f, err := os.Create("testdata/tuples.golden")
if err != nil {
t.Fatalf("failed to open golden file: %s", err)
}
defer f.Close()
err = gob.NewEncoder(f).Encode(golden)
if err != nil {
t.Fatalf("failed to encode golden file: %s", err)
}
}
var testUUID = UUID{
0x11, 0x00, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF,
0x11, 0x00, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF,
}
func genBytes() interface{} { return []byte("namespace") }
func genBytesNil() interface{} { return []byte{0xFF, 0x00, 0xFF} }
func genString() interface{} { return "namespace" }
func genStringNil() interface{} { return "nam\x00es\xFFpace" }
func genInt() interface{} { return rand.Int63() }
func genFloat() interface{} { return float32(rand.NormFloat64()) }
func genDouble() interface{} { return rand.NormFloat64() }
func mktuple(gen func() interface{}, count int) Tuple {
tt := make(Tuple, count)
for i := 0; i < count; i++ {
tt[i] = gen()
}
return tt
}
var testCases = []struct {
name string
tuple Tuple
}{
{"Simple", Tuple{testUUID, "foobarbaz", 1234, nil}},
{"Namespaces", Tuple{testUUID, "github", "com", "apple", "foundationdb", "tree"}},
{"ManyStrings", mktuple(genString, 8)},
{"ManyStringsNil", mktuple(genStringNil, 8)},
{"ManyBytes", mktuple(genBytes, 20)},
{"ManyBytesNil", mktuple(genBytesNil, 20)},
{"LargeBytes", Tuple{testUUID, bytes.Repeat([]byte("abcd"), 20)}},
{"LargeBytesNil", Tuple{testUUID, bytes.Repeat([]byte{0xFF, 0x0, 0xFF}, 20)}},
{"Integers", mktuple(genInt, 20)},
{"Floats", mktuple(genFloat, 20)},
{"Doubles", mktuple(genDouble, 20)},
{"UUIDs", Tuple{testUUID, true, testUUID, false, testUUID, true, testUUID, false, testUUID, true}},
{"NilCases", Tuple{"\x00", "\x00\xFF", "\x00\x00\x00", "\xFF\x00", ""}},
{"Nested", Tuple{testUUID, mktuple(genInt, 4), nil, mktuple(genBytes, 4), nil, mktuple(genDouble, 4), nil}},
}
func TestTuplePacking(t *testing.T) {
var golden map[string][]byte
if *update {
golden = make(map[string][]byte)
} else {
golden = loadGolden(t)
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
result := tt.tuple.Pack()
if *update {
golden[tt.name] = result
return
}
if !bytes.Equal(result, golden[tt.name]) {
t.Errorf("packing mismatch: expected %v, got %v", golden[tt.name], result)
}
})
}
if *update {
writeGolden(t, golden)
}
}
func BenchmarkTuplePacking(b *testing.B) {
for _, bm := range testCases {
b.Run(bm.name, func(b *testing.B) {
tuple := bm.tuple
for i := 0; i < b.N; i++ {
_ = tuple.Pack()
}
})
}
}

View File

@ -10,7 +10,7 @@
<packaging>jar</packaging>
<name>foundationdb-java</name>
<description>Java bindings for the FoundationDB database. These bindings require the FoundationDB client, which is under a different license. The client can be obtained from https://www.foundationdb.org/downloads/fdb-c/.</description>
<description>Java bindings for the FoundationDB database. These bindings require the FoundationDB client, which is under a different license. The client can be obtained from https://www.foundationdb.org/download/.</description>
<inceptionYear>2010</inceptionYear>
<url>https://www.foundationdb.org</url>

View File

@ -1035,7 +1035,10 @@ public class DirectoryLayer implements Directory {
Range oldAllocations = new Range(allocator.recent.getKey(), allocator.recent.get(windowStart).getKey());
CompletableFuture<byte[]> newCountRead;
// SOMEDAY: synchronize on something transaction local
// SOMEDAY: this code would work as written if synchronized on something transaction local.
// The reason we don't synchronize on the Transaction tr itself is that the user could also be using it
// for synchronization. If they did, then synchronizing on tr here could lead to performance issues or
// deadlocks.
synchronized(HighContentionAllocator.class) {
if(windowStart > initialWindowStart) {
tr.clear(oldCounters);
@ -1073,7 +1076,10 @@ public class DirectoryLayer implements Directory {
AsyncIterable<KeyValue> counterRange;
CompletableFuture<byte[]> allocationTemp;
// SOMEDAY: synchronize on something transaction local
// SOMEDAY: this code would work as written if synchronized on something transaction local.
// The reason we don't synchronize on the Transaction tr itself is that the user could also be using it
// for synchronization. If they did, then synchronizing on tr here could lead to performance issues or
// deadlocks.
synchronized(HighContentionAllocator.class) {
counterRange = tr.snapshot().getRange(countersRange, 1, true);
allocationTemp = tr.get(allocationKey);

View File

@ -1,3 +1,3 @@
Complete documentation of the FoundationDB Python API can be found at https://apple.github.io/foundationdb/api-python.html.
These bindings require the FoundationDB client. The client can be obtained from https://www.foundationdb.org/downloads/fdb-c/.
These bindings require the FoundationDB client. The client can be obtained from https://www.foundationdb.org/download/.

View File

@ -18,5 +18,5 @@ EOF
s.license = 'Apache v2'
s.add_dependency('ffi', '>= 1.1.5')
s.required_ruby_version = '>= 1.9.3'
s.requirements << 'These bindings require the FoundationDB client. The client can be obtained from https://www.foundationdb.org/downloads/fdb-c/.'
s.requirements << 'These bindings require the FoundationDB client. The client can be obtained from https://www.foundationdb.org/download/.'
end

View File

@ -121,7 +121,9 @@ ifeq ($(PLATFORM),linux)
@rm -f packages/foundationdb-server-*.rpm packages/foundationdb-clients-*.rpm
@packaging/rpm/buildrpms.sh $(VERSION) $(PKGRELEASE)
packages: DEB RPM JAVA_RELEASE FDBTESTS FDBHEADERS
FDBTLS: bin/fdb-libressl-plugin.$(DLEXT)
packages: DEB RPM JAVA_RELEASE FDBTESTS FDBHEADERS FDBTLS
packages_clean: DEB_clean RPM_clean JAVA_RELEASE_clean FDBHEADERS_clean

View File

@ -37,16 +37,20 @@
"counter":0,
"roughness":0.0
},
"kvstore_used_bytes":12341234,
"stored_bytes":12341234,
"kvstore_used_bytes":12341234,
"kvstore_available_bytes":12341234,
"kvstore_free_bytes":12341234,
"kvstore_total_bytes":12341234,
"durable_bytes":{
"hz":0.0,
"counter":0,
"roughness":0.0
},
"queue_disk_used_bytes":12341234,
"queue_disk_available_bytes":12341234,
"queue_disk_free_bytes":12341234,
"persistent_disk_used_bytes":12341234,
"queue_disk_total_bytes":12341234,
"role":{
"$enum":[
"master",
@ -59,12 +63,7 @@
},
"data_version":12341234,
"data_version_lag":12341234,
"persistent_disk_total_bytes":12341234,
"queue_disk_total_bytes":12341234,
"persistent_disk_free_bytes":12341234,
"queue_disk_used_bytes":12341234,
"id":"eb84471d68c12d1d26f692a50000003f",
"kvstore_total_bytes":12341234,
"finished_queries":{
"hz":0.0,
"counter":0,
@ -289,7 +288,9 @@
"roughness":0.0
},
"reads":{
"hz":0.0
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"bytes":{
@ -297,6 +298,18 @@
"hz":0.0,
"counter":0,
"roughness":0.0
},
"read":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"keys":{
"read":{
"hz":0.0,
"counter":0,
"roughness":0.0
}
},
"transactions":{

View File

@ -149,8 +149,8 @@ FoundationDB includes a few modules that make it easy to model data using this a
The :mod:`directory` module lets us open a :ref:`directory <developer-guide-directories>` in the database::
schedulingDir, err := directory.CreateOrOpen(db, []string{"scheduling"}, nil)
if e != nil {
log.Fatal(e)
if err != nil {
log.Fatal(err)
}
The :func:`CreateOrOpen` function returns a :ref:`subspace <developer-guide-sub-keyspaces>` where we'll store our application data. Each subspace has a fixed prefix it uses when defining keys. The prefix corresponds to the first element of a tuple. We decided that we wanted ``"attends"`` and ``"class"`` as our prefixes, so we'll create new subspaces for them within the ``scheduling`` subspace.::

View File

@ -79,9 +79,9 @@ Plugin default location
Similarly, if a value is not specified for the parameter ``tls_plugin``, the file will be specified by the environment variable ``FDB_TLS_PLUGIN`` or, if this variable is not set, the system-dependent location:
* Linux: ``/usr/lib/foundationdb/plugins/FDBLibTLS.so``
* macOS: ``/usr/local/foundationdb/plugins/FDBLibTLS.dylib``
* Windows: ``C:\Program Files\foundationdb\plugins\FDBLibTLS.dll``
* Linux: ``/usr/lib/foundationdb/plugins/fdb-libressl-plugin.so``
* macOS: ``/usr/local/foundationdb/plugins/fdb-libressl-plugin.dylib``
* Windows: ``C:\Program Files\foundationdb\plugins\fdb-libressl-plugin.dll``
On Windows, this location will be relative to the chosen installation location. The environment variable ``FOUNDATIONDB_INSTALL_PATH`` will be used in place of ``C:\Program Files\foundationdb\`` to determine this location.
@ -183,11 +183,11 @@ If the default checking of the certificate chain is too stringent, the verificat
===================== =============================================================
Setting Result
===================== =============================================================
``Check.Valid=0`` Sets the current process to disable all further verification
``Check.Valid=0`` Sets the current process to disable all further verification
of a peer certificate.
``Check.Unexpired=0`` Disables date checking of peer certificates. If the clocks in
the cluster and between the clients and servers are not to be
trusted, setting this value to ``0`` can allow communications
``Check.Unexpired=0`` Disables date checking of peer certificates. If the clocks in
the cluster and between the clients and servers are not to be
trusted, setting this value to ``0`` can allow communications
to proceed.
===================== =============================================================

View File

@ -854,7 +854,7 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
outputString += "\n Redundancy mode - ";
std::string strVal;
if (statusObjConfig.get("redundancy.factor", strVal)){
if (statusObjConfig.get("redundancy_mode", strVal)){
outputString += strVal;
} else
outputString += "unknown";

View File

@ -139,6 +139,7 @@ static std::string describe( std::set<T> const& items, int max_items = -1 ) {
}
std::string printable( const StringRef& val );
std::string printable( const std::string& val );
std::string printable( const Optional<StringRef>& val );
std::string printable( const Optional<Standalone<StringRef>>& val );
std::string printable( const KeyRangeRef& range );

View File

@ -717,7 +717,7 @@ namespace fileBackup {
return Void();
}
ACTOR static Future<Void> abortOldBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, std::string tagName) {
ACTOR static Future<Void> abortFiveZeroBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, std::string tagName) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -761,7 +761,7 @@ namespace fileBackup {
return Void();
}
struct AbortOldBackupTask : TaskFuncBase {
struct AbortFiveZeroBackupTask : TaskFuncBase {
static StringRef name;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state FileBackupAgent backupAgent;
@ -772,14 +772,14 @@ namespace fileBackup {
TraceEvent(SevInfo, "FileBackupCancelOldTask")
.detail("task", printable(task->params[Task::reservedTaskParamKeyType]))
.detail("tagName", tagName);
Void _ = wait(abortOldBackup(&backupAgent, tr, tagName));
Void _ = wait(abortFiveZeroBackup(&backupAgent, tr, tagName));
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
virtual StringRef getName() const {
TraceEvent(SevError, "FileBackupError").detail("cause", "AbortOldBackupTaskFunc::name() should never be called");
TraceEvent(SevError, "FileBackupError").detail("cause", "AbortFiveZeroBackupTaskFunc::name() should never be called");
ASSERT(false);
return StringRef();
}
@ -787,16 +787,86 @@ namespace fileBackup {
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Future<Void>(Void()); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef AbortOldBackupTask::name = LiteralStringRef("abort_legacy_backup");
REGISTER_TASKFUNC(AbortOldBackupTask);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_diff_logs);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_log_range);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_logs);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_range);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_backup_restorable);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_finish_full_backup);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_finished_full_backup);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTask, file_start_full_backup);
StringRef AbortFiveZeroBackupTask::name = LiteralStringRef("abort_legacy_backup");
REGISTER_TASKFUNC(AbortFiveZeroBackupTask);
REGISTER_TASKFUNC_ALIAS(AbortFiveZeroBackupTask, file_backup_diff_logs);
REGISTER_TASKFUNC_ALIAS(AbortFiveZeroBackupTask, file_backup_log_range);
REGISTER_TASKFUNC_ALIAS(AbortFiveZeroBackupTask, file_backup_logs);
REGISTER_TASKFUNC_ALIAS(AbortFiveZeroBackupTask, file_backup_range);
REGISTER_TASKFUNC_ALIAS(AbortFiveZeroBackupTask, file_backup_restorable);
REGISTER_TASKFUNC_ALIAS(AbortFiveZeroBackupTask, file_finish_full_backup);
REGISTER_TASKFUNC_ALIAS(AbortFiveZeroBackupTask, file_finished_full_backup);
REGISTER_TASKFUNC_ALIAS(AbortFiveZeroBackupTask, file_start_full_backup);
ACTOR static Future<Void> abortFiveOneBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, std::string tagName) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state KeyBackedTag tag = makeBackupTag(tagName);
state UidAndAbortedFlagT current = wait(tag.getOrThrow(tr, false, backup_unneeded()));
state BackupConfig config(current.first);
EBackupState status = wait(config.stateEnum().getD(tr, EBackupState::STATE_NEVERRAN));
if (!backupAgent->isRunnable((BackupAgentBase::enumState)status)) {
throw backup_unneeded();
}
TraceEvent(SevInfo, "FBA_abortFileOneBackup")
.detail("tagName", tagName.c_str())
.detail("status", BackupAgentBase::getStateText(status));
// Cancel backup task through tag
Void _ = wait(tag.cancel(tr));
Key configPath = uidPrefixKey(logRangesRange.begin, config.getUid());
Key logsPath = uidPrefixKey(backupLogKeys.begin, config.getUid());
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
tr->clear(KeyRangeRef(logsPath, strinc(logsPath)));
config.stateEnum().set(tr, EBackupState::STATE_ABORTED);
return Void();
}
struct AbortFiveOneBackupTask : TaskFuncBase {
static StringRef name;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state FileBackupAgent backupAgent;
state BackupConfig config(task);
state std::string tagName = wait(config.tag().getOrThrow(tr));
TEST(true); // Canceling 5.1 backup task
TraceEvent(SevInfo, "FileBackupCancelFiveOneTask")
.detail("task", printable(task->params[Task::reservedTaskParamKeyType]))
.detail("tagName", tagName);
Void _ = wait(abortFiveOneBackup(&backupAgent, tr, tagName));
Void _ = wait(taskBucket->finish(tr, task));
return Void();
}
virtual StringRef getName() const {
TraceEvent(SevError, "FileBackupError").detail("cause", "AbortFiveOneBackupTaskFunc::name() should never be called");
ASSERT(false);
return StringRef();
}
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Future<Void>(Void()); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef AbortFiveOneBackupTask::name = LiteralStringRef("abort_legacy_backup_5.2");
REGISTER_TASKFUNC(AbortFiveOneBackupTask);
REGISTER_TASKFUNC_ALIAS(AbortFiveOneBackupTask, file_backup_write_range);
REGISTER_TASKFUNC_ALIAS(AbortFiveOneBackupTask, file_backup_dispatch_ranges);
REGISTER_TASKFUNC_ALIAS(AbortFiveOneBackupTask, file_backup_write_logs);
REGISTER_TASKFUNC_ALIAS(AbortFiveOneBackupTask, file_backup_erase_logs);
REGISTER_TASKFUNC_ALIAS(AbortFiveOneBackupTask, file_backup_dispatch_logs);
REGISTER_TASKFUNC_ALIAS(AbortFiveOneBackupTask, file_backup_finished);
REGISTER_TASKFUNC_ALIAS(AbortFiveOneBackupTask, file_backup_write_snapshot_manifest);
REGISTER_TASKFUNC_ALIAS(AbortFiveOneBackupTask, file_backup_start);
std::function<void(Reference<Task>)> NOP_SETUP_TASK_FN = [](Reference<Task> task) { /* NOP */ };
ACTOR static Future<Key> addBackupTask(StringRef name,
@ -1137,7 +1207,7 @@ namespace fileBackup {
}
};
StringRef BackupRangeTaskFunc::name = LiteralStringRef("file_backup_write_range");
StringRef BackupRangeTaskFunc::name = LiteralStringRef("file_backup_write_range_5.2");
const uint32_t BackupRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupRangeTaskFunc);
@ -1620,7 +1690,7 @@ namespace fileBackup {
}
};
StringRef BackupSnapshotDispatchTask::name = LiteralStringRef("file_backup_dispatch_ranges");
StringRef BackupSnapshotDispatchTask::name = LiteralStringRef("file_backup_dispatch_ranges_5.2");
const uint32_t BackupSnapshotDispatchTask::version = 1;
REGISTER_TASKFUNC(BackupSnapshotDispatchTask);
@ -1816,7 +1886,7 @@ namespace fileBackup {
}
};
StringRef BackupLogRangeTaskFunc::name = LiteralStringRef("file_backup_write_logs");
StringRef BackupLogRangeTaskFunc::name = LiteralStringRef("file_backup_write_logs_5.2");
const uint32_t BackupLogRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupLogRangeTaskFunc);
@ -1885,7 +1955,7 @@ namespace fileBackup {
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef EraseLogRangeTaskFunc::name = LiteralStringRef("file_backup_erase_logs");
StringRef EraseLogRangeTaskFunc::name = LiteralStringRef("file_backup_erase_logs_5.2");
const uint32_t EraseLogRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(EraseLogRangeTaskFunc);
@ -2002,7 +2072,7 @@ namespace fileBackup {
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef BackupLogsDispatchTask::name = LiteralStringRef("file_backup_dispatch_logs");
StringRef BackupLogsDispatchTask::name = LiteralStringRef("file_backup_dispatch_logs_5.2");
const uint32_t BackupLogsDispatchTask::version = 1;
REGISTER_TASKFUNC(BackupLogsDispatchTask);
@ -2042,7 +2112,7 @@ namespace fileBackup {
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef FileBackupFinishedTask::name = LiteralStringRef("file_backup_finished");
StringRef FileBackupFinishedTask::name = LiteralStringRef("file_backup_finished_5.2");
const uint32_t FileBackupFinishedTask::version = 1;
REGISTER_TASKFUNC(FileBackupFinishedTask);
@ -2199,7 +2269,7 @@ namespace fileBackup {
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef BackupSnapshotManifest::name = LiteralStringRef("file_backup_write_snapshot_manifest");
StringRef BackupSnapshotManifest::name = LiteralStringRef("file_backup_write_snapshot_manifest_5.2");
const uint32_t BackupSnapshotManifest::version = 1;
REGISTER_TASKFUNC(BackupSnapshotManifest);
@ -2285,7 +2355,7 @@ namespace fileBackup {
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef StartFullBackupTaskFunc::name = LiteralStringRef("file_backup_start");
StringRef StartFullBackupTaskFunc::name = LiteralStringRef("file_backup_start_5.2");
const uint32_t StartFullBackupTaskFunc::version = 1;
REGISTER_TASKFUNC(StartFullBackupTaskFunc);
@ -2802,7 +2872,7 @@ namespace fileBackup {
TraceEvent("FileRestoreDispatch")
.detail("RestoreUID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginFile", printable(Params.beginFile().get(task)))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("RestoreVersion", restoreVersion)
.detail("ApplyLag", applyLag)
@ -2816,7 +2886,7 @@ namespace fileBackup {
TraceEvent("FileRestoreDispatch")
.detail("RestoreUID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginFile", printable(Params.beginFile().get(task)))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("RestoreVersion", restoreVersion)
.detail("ApplyLag", applyLag)
@ -2830,7 +2900,7 @@ namespace fileBackup {
TraceEvent("FileRestoreDispatch")
.detail("RestoreUID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginFile", printable(Params.beginFile().get(task)))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("ApplyLag", applyLag)
.detail("Decision", "restore_complete")
@ -2938,7 +3008,7 @@ namespace fileBackup {
TraceEvent("FileRestoreDispatch")
.detail("RestoreUID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginFile", printable(Params.beginFile().get(task)))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("EndVersion", endVersion)
.detail("ApplyLag", applyLag)
@ -2984,7 +3054,7 @@ namespace fileBackup {
TraceEvent("FileRestoreDispatch")
.detail("RestoreUID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginFile", printable(Params.beginFile().get(task)))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("EndVersion", endVersion)
.detail("ApplyLag", applyLag)

View File

@ -124,6 +124,10 @@ std::string printable( const StringRef& val ) {
return val.printable();
}
std::string printable( const std::string& str ) {
return StringRef(str).printable();
}
std::string printable( const Optional<StringRef>& val ) {
if( val.present() )
return printable( val.get() );

View File

@ -344,12 +344,11 @@ struct StorageQueuingMetricsReply {
int64_t instanceID; // changes if bytesDurable and bytesInput reset
int64_t bytesDurable, bytesInput;
StorageBytes storageBytes;
double readReplyRate; // for status
Version v; // current storage server version
template <class Ar>
void serialize(Ar& ar) {
ar & localTime & instanceID & bytesDurable & bytesInput & readReplyRate & v & storageBytes;
ar & localTime & instanceID & bytesDurable & bytesInput & v & storageBytes;
}
};

View File

@ -142,6 +142,8 @@ description is not currently required but encouraged.
description="Deprecated" />
<Option name="durability_datacenter" code="110" />
<Option name="durability_risky" code="120" />
<Option name="durability_dev_null_is_web_scale" code="130"
description="Deprecated"/>
<Option name="priority_system_immediate" code="200"
description="Specifies that this transaction should be treated as highest priority and that lower priority transactions should block behind this one. Use is discouraged outside of low-level tools" />
<Option name="priority_batch" code="201"

View File

@ -615,7 +615,7 @@ void print_usage(const char* name) {
" fdbmonitor. The default is `/var/run/fdbmonitor.pid'.\n"
" --loggroup LOGGROUP\n"
" Sets the 'LogGroup' field with the specified value for all\n"
" entries in the log output. The default log group is 'default'."
" entries in the log output. The default log group is 'default'.\n"
" --daemonize Background the fdbmonitor process.\n"
" -h, --help Display this help and exit.\n", name);
}

View File

@ -309,7 +309,7 @@ void TLSOptions::init_plugin( std::string const& plugin_path ) {
} else {
if ( !platform::getEnvironmentVar( "FDB_TLS_PLUGIN", path ) )
// FIXME: should there be other fallbacks?
path = platform::getDefaultPluginPath("FDBLibTLS");
path = platform::getDefaultPluginPath("fdb-libressl-plugin");
}
TraceEvent("TLSConnectionLoadingPlugin").detail("PluginPath", path);

View File

@ -84,14 +84,13 @@ struct StorageQueueInfo {
Smoother smoothDurableVersion, smoothLatestVersion;
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
double readReplyRate;
limitReason_t limitReason;
StorageQueueInfo(UID id, LocalityData locality) : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(1.), smoothLatestVersion(1.), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), readReplyRate(0.0), limitReason(limitReason_t::unlimited)
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited)
{
// FIXME: this is a tacky workaround for a potential unitialized use in trackStorageServerQueueInfo
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1;
}
};
@ -107,7 +106,7 @@ struct TLogQueueInfo {
TLogQueueInfo(UID id) : valid(false), id(id), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT) {
// FIXME: this is a tacky workaround for a potential unitialized use in trackTLogQueueInfo (copied from storageQueueInfO)
// FIXME: this is a tacky workaround for a potential uninitialized use in trackTLogQueueInfo (copied from storageQueueInfO)
lastReply.instanceID = -1;
}
};
@ -147,7 +146,6 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
myQueueInfo->value.valid = true;
myQueueInfo->value.prevReply = myQueueInfo->value.lastReply;
myQueueInfo->value.lastReply = reply.get();
myQueueInfo->value.readReplyRate = reply.get().readReplyRate;
if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) {
myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable);
myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable);
@ -263,7 +261,6 @@ void updateRate( Ratekeeper* self ) {
limitReason_t limitReason = limitReason_t::unlimited;
int sscount = 0;
double readReplyRateSum=0.0;
int64_t worstFreeSpaceStorageServer = std::numeric_limits<int64_t>::max();
int64_t worstStorageQueueStorageServer = 0;
@ -279,8 +276,6 @@ void updateRate( Ratekeeper* self ) {
ss.limitReason = limitReason_t::unlimited;
readReplyRateSum += ss.readReplyRate;
int64_t minFreeSpace = std::max(SERVER_KNOBS->MIN_FREE_SPACE, (int64_t)(SERVER_KNOBS->MIN_FREE_SPACE_RATIO * ss.smoothTotalSpace.smoothTotal()));
worstFreeSpaceStorageServer = std::min(worstFreeSpaceStorageServer, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace);
@ -499,7 +494,6 @@ void updateRate( Ratekeeper* self ) {
.detail("StorageServers", sscount)
.detail("Proxies", self->proxy_transactionCountAndTime.size())
.detail("TLogs", tlcount)
.detail("ReadReplyRate", readReplyRateSum)
.detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer)
.detail("WorstFreeSpaceTLog", worstFreeSpaceTLog)
.detail("WorstStorageServerQueue", worstStorageQueueStorageServer)

View File

@ -1296,9 +1296,13 @@ static std::set<StringRef> getTLogEligibleMachines(vector<std::pair<WorkerInterf
return tlogEligibleMachines;
}
ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker, std::string dbName, StatusObject *qos, StatusObject *data_overlay, std::set<std::string> *incomplete_reasons) {
ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker,
std::string dbName, StatusObject *qos, StatusObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, std::string>>>> storageServerFuture)
{
state StatusObject statusObj;
state StatusObject operationsObj;
state StatusObject bytesObj;
state StatusObject keysObj;
// Writes and conflicts
try {
@ -1327,10 +1331,7 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
}
operationsObj["writes"] = mutations;
StatusObject bytesObj;
bytesObj["written"] = mutationBytes;
statusObj["bytes"] = bytesObj;
StatusObject transactions;
transactions["conflicted"] = txnConflicts;
@ -1345,12 +1346,11 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
incomplete_reasons->insert("Unknown mutations, conflicts, and transactions state.");
}
// Transactions and reads
// Transactions
try {
Standalone<StringRef> md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest(StringRef(dbName+"/RkUpdate") ) ), 1.0) );
double tpsLimit = parseDouble(extractAttribute(md, LiteralStringRef("TPSLimit")));
double transPerSec = parseDouble(extractAttribute(md, LiteralStringRef("ReleasedTPS")));
double readReplyRate = parseDouble(extractAttribute(md, LiteralStringRef("ReadReplyRate")));
int ssCount = parseInt(extractAttribute(md, LiteralStringRef("StorageServers")));
int tlogCount = parseInt(extractAttribute(md, LiteralStringRef("TLogs")));
int64_t worstFreeSpaceStorageServer = parseInt64(extractAttribute(md, LiteralStringRef("WorstFreeSpaceStorageServer")));
@ -1361,11 +1361,6 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
int64_t totalDiskUsageBytes = parseInt64(extractAttribute(md, LiteralStringRef("TotalDiskUsageBytes")));
int64_t worstVersionLag = parseInt64(extractAttribute(md, LiteralStringRef("WorstStorageServerVersionLag")));
int64_t limitingVersionLag = parseInt64(extractAttribute(md, LiteralStringRef("LimitingStorageServerVersionLag")));
StatusObject readsObj;
readsObj["hz"] = readReplyRate;
operationsObj["reads"] = readsObj;
(*data_overlay)["total_disk_used_bytes"] = totalDiskUsageBytes;
if(ssCount > 0) {
@ -1406,9 +1401,41 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
} catch (Error &e){
if (e.code() == error_code_actor_cancelled)
throw;
incomplete_reasons->insert("Unknown read and performance state.");
incomplete_reasons->insert("Unknown performance state.");
}
// Reads
try {
ErrorOr<vector<std::pair<StorageServerInterface, std::string>>> storageServers = wait(storageServerFuture);
if(!storageServers.present()) {
throw storageServers.getError();
}
StatusObject reads = makeCounter();
StatusObject readKeys = makeCounter();
StatusObject readBytes = makeCounter();
for(auto &ss : storageServers.get()) {
reads = addCounters(reads, parseCounter(extractAttribute(ss.second, LiteralStringRef("finishedQueries"))));
readKeys = addCounters(readKeys, parseCounter(extractAttribute(ss.second, LiteralStringRef("rowsQueried"))));
readBytes = addCounters(readBytes, parseCounter(extractAttribute(ss.second, LiteralStringRef("bytesQueried"))));
}
operationsObj["reads"] = reads;
keysObj["read"] = readKeys;
bytesObj["read"] = readBytes;
}
catch (Error& e) {
if (e.code() == error_code_actor_cancelled)
throw;
incomplete_reasons->insert("Unknown read state.");
}
statusObj["operations"] = operationsObj;
statusObj["keys"] = keysObj;
statusObj["bytes"] = bytesObj;
return statusObj;
}
@ -1767,13 +1794,6 @@ ACTOR Future<StatusReply> clusterGetStatus(
statusObj["latency_probe"] = latencyProbeResults;
}
state int minReplicasRemaining = -1;
std::vector<Future<StatusObject>> futures2;
futures2.push_back(dataStatusFetcher(mWorker, dbName, &minReplicasRemaining));
futures2.push_back(workloadStatusFetcher(db, workers, mWorker, dbName, &qos, &data_overlay, &status_incomplete_reasons));
futures2.push_back(layerStatusFetcher(cx, &messages, &status_incomplete_reasons));
futures2.push_back(lockedStatusFetcher(db, &messages, &status_incomplete_reasons));
// Start getting storage servers now (using system priority) concurrently. Using sys priority because having storage servers
// in status output is important to give context to error messages in status that reference a storage server role ID.
state std::unordered_map<NetworkAddress, WorkerInterface> address_workers;
@ -1782,6 +1802,13 @@ ACTOR Future<StatusReply> clusterGetStatus(
state Future<ErrorOr<vector<std::pair<StorageServerInterface, std::string>>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers));
state Future<ErrorOr<vector<std::pair<TLogInterface, std::string>>>> tLogFuture = errorOr(getTLogsAndMetrics(db, address_workers));
state int minReplicasRemaining = -1;
std::vector<Future<StatusObject>> futures2;
futures2.push_back(dataStatusFetcher(mWorker, dbName, &minReplicasRemaining));
futures2.push_back(workloadStatusFetcher(db, workers, mWorker, dbName, &qos, &data_overlay, &status_incomplete_reasons, storageServerFuture));
futures2.push_back(layerStatusFetcher(cx, &messages, &status_incomplete_reasons));
futures2.push_back(lockedStatusFetcher(db, &messages, &status_incomplete_reasons));
state std::vector<StatusObject> workerStatuses = wait(getAll(futures2));
int oldLogFaultTolerance = 100;

View File

@ -385,8 +385,6 @@ public:
Promise<Void> coreStarted;
bool shuttingDown;
Smoother readReplyRate; //FIXME: very similar to counters.finishedQueries, new fast load balancing smoother
bool behind;
bool debug_inApplyUpdate;
@ -451,8 +449,7 @@ public:
updateEagerReads(0),
shardChangeCounter(0),
fetchKeysParallelismLock(SERVER_KNOBS->FETCH_KEYS_PARALLELISM_BYTES),
shuttingDown(false), readReplyRate(SERVER_KNOBS->STORAGE_LOGGING_DELAY / 2.0),
debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0),
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0),
logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
behind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
@ -738,8 +735,6 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
data->metrics.notify(req.key, m);
*/
data->readReplyRate.addDelta(1);
if (v.present()) {
++data->counters.rowsQueried;
data->counters.bytesQueried += v.get().size();
@ -1249,7 +1244,6 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
none.penalty = data->getPenalty();
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.begin.getKey(), req.end.getKey()), std::max<KeyRef>(req.begin.getKey(), req.end.getKey()) ) );
data->readReplyRate.addDelta(1);
req.reply.send( none );
} else {
state int remainingLimitBytes = req.limitBytes;
@ -1273,7 +1267,6 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
m.iosPerKSecond = 1; //FIXME: this should be 1/r.data.size(), but we cannot do that because it is an int
data->metrics.notify(r.data[i].key, m);
}*/
data->readReplyRate.addDelta(1);
r.penalty = data->getPenalty();
req.reply.send( r );
@ -1311,7 +1304,6 @@ ACTOR Future<Void> getKey( StorageServer* data, GetKeyRequest req ) {
Key k = wait( findKey( data, req.sel, version, shard, &offset ) );
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k) ) );
data->readReplyRate.addDelta(1);
KeySelector updated;
if (offset < 0)
@ -1345,7 +1337,6 @@ void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const&
reply.instanceID = self->instanceID;
reply.bytesInput = self->counters.bytesInput.getValue();
reply.bytesDurable = self->counters.bytesDurable.getValue();
reply.readReplyRate = self->readReplyRate.smoothRate();
reply.storageBytes = self->storage.getStorageBytes();

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long