mirror of
https://github.com/apple/foundationdb.git
synced 2025-05-14 18:02:31 +08:00
Merge pull request #3969 from Daniel-B-Smith/rocksdb-unsafe-fsync
RocksDB grab bag
This commit is contained in:
commit
fc8b57189e
@ -27,6 +27,9 @@ rocksdb::ColumnFamilyOptions getCFOptions() {
|
||||
rocksdb::ColumnFamilyOptions options;
|
||||
options.level_compaction_dynamic_level_bytes = true;
|
||||
options.OptimizeLevelStyleCompaction(SERVER_KNOBS->ROCKSDB_MEMTABLE_BYTES);
|
||||
if (SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS > 0) {
|
||||
options.periodic_compaction_seconds = SERVER_KNOBS->ROCKSDB_PERIODIC_COMPACTION_SECONDS;
|
||||
}
|
||||
// Compact sstables when there's too much deleted stuff.
|
||||
options.table_properties_collector_factories = { rocksdb::NewCompactOnDeletionCollectorFactory(128, 1) };
|
||||
return options;
|
||||
@ -52,7 +55,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
|
||||
explicit Writer(DB& db, UID id) : db(db), id(id) {}
|
||||
|
||||
~Writer() {
|
||||
~Writer() override {
|
||||
if (db) {
|
||||
delete db;
|
||||
}
|
||||
@ -85,6 +88,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", status.ToString()).detail("Method", "Open");
|
||||
a.done.sendError(statusToError(status));
|
||||
} else {
|
||||
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Open");
|
||||
a.done.send(Void());
|
||||
}
|
||||
}
|
||||
@ -96,7 +100,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
};
|
||||
void action(CommitAction& a) {
|
||||
rocksdb::WriteOptions options;
|
||||
options.sync = true;
|
||||
options.sync = !SERVER_KNOBS->ROCKSDB_UNSAFE_AUTO_FSYNC;
|
||||
auto s = db->Write(options, a.batchToCommit.get());
|
||||
if (!s.ok()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Commit");
|
||||
@ -114,6 +118,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
double getTimeEstimate() override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
|
||||
};
|
||||
void action(CloseAction& a) {
|
||||
if (db == nullptr) {
|
||||
a.done.send(Void());
|
||||
return;
|
||||
}
|
||||
auto s = db->Close();
|
||||
if (!s.ok()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Close");
|
||||
@ -121,8 +129,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
if (a.deleteOnClose) {
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> defaultCF = { rocksdb::ColumnFamilyDescriptor{
|
||||
"default", getCFOptions() } };
|
||||
rocksdb::DestroyDB(a.path, getOptions(), defaultCF);
|
||||
s = rocksdb::DestroyDB(a.path, getOptions(), defaultCF);
|
||||
if (!s.ok()) {
|
||||
TraceEvent(SevError, "RocksDBError").detail("Error", s.ToString()).detail("Method", "Destroy");
|
||||
} else {
|
||||
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Destroy");
|
||||
}
|
||||
}
|
||||
TraceEvent(SevInfo, "RocksDB").detail("Path", a.path).detail("Method", "Close");
|
||||
a.done.send(Void());
|
||||
}
|
||||
};
|
||||
@ -266,7 +280,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
UID id;
|
||||
Reference<IThreadPool> writeThread;
|
||||
Reference<IThreadPool> readThreads;
|
||||
unsigned nReaders = 16;
|
||||
Promise<Void> errorPromise;
|
||||
Promise<Void> closePromise;
|
||||
std::unique_ptr<rocksdb::WriteBatch> writeBatch;
|
||||
@ -278,7 +291,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
writeThread = createGenericThreadPool();
|
||||
readThreads = createGenericThreadPool();
|
||||
writeThread->addThread(new Writer(db, id));
|
||||
for (unsigned i = 0; i < nReaders; ++i) {
|
||||
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
|
||||
readThreads->addThread(new Reader(db));
|
||||
}
|
||||
}
|
||||
@ -372,16 +385,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
||||
}
|
||||
|
||||
StorageBytes getStorageBytes() override {
|
||||
uint64_t live = 0;
|
||||
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kEstimateLiveDataSize, &live));
|
||||
|
||||
int64_t free;
|
||||
int64_t total;
|
||||
|
||||
uint64_t sstBytes = 0;
|
||||
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kTotalSstFilesSize, &sstBytes));
|
||||
uint64_t memtableBytes = 0;
|
||||
ASSERT(db->GetIntProperty(rocksdb::DB::Properties::kSizeAllMemTables, &memtableBytes));
|
||||
g_network->getDiskBytes(path, free, total);
|
||||
|
||||
return StorageBytes(free, total, sstBytes + memtableBytes, free);
|
||||
return StorageBytes(free, total, live, free);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -98,7 +98,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
||||
init( PEEK_STATS_SLOW_RATIO, 0.5 );
|
||||
init( PUSH_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PUSH_RESET_INTERVAL = 20.0;
|
||||
init( PUSH_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PUSH_MAX_LATENCY = 0.0;
|
||||
init( PUSH_STATS_INTERVAL, 10.0 );
|
||||
init( PUSH_STATS_INTERVAL, 10.0 );
|
||||
init( PUSH_STATS_SLOW_AMOUNT, 2 );
|
||||
init( PUSH_STATS_SLOW_RATIO, 0.5 );
|
||||
|
||||
@ -318,7 +318,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
||||
|
||||
// KeyValueStoreRocksDB
|
||||
init( ROCKSDB_BACKGROUND_PARALLELISM, 0 );
|
||||
init( ROCKSDB_READ_PARALLELISM, 4 );
|
||||
init( ROCKSDB_MEMTABLE_BYTES, 512 * 1024 * 1024 );
|
||||
init( ROCKSDB_UNSAFE_AUTO_FSYNC, false );
|
||||
init( ROCKSDB_PERIODIC_COMPACTION_SECONDS, 0 );
|
||||
|
||||
// Leader election
|
||||
bool longLeaderElection = randomize && BUGGIFY;
|
||||
|
@ -253,7 +253,10 @@ public:
|
||||
|
||||
// KeyValueStoreRocksDB
|
||||
int ROCKSDB_BACKGROUND_PARALLELISM;
|
||||
int ROCKSDB_READ_PARALLELISM;
|
||||
int64_t ROCKSDB_MEMTABLE_BYTES;
|
||||
bool ROCKSDB_UNSAFE_AUTO_FSYNC;
|
||||
int64_t ROCKSDB_PERIODIC_COMPACTION_SECONDS;
|
||||
|
||||
// Leader election
|
||||
int MAX_NOTIFICATIONS;
|
||||
|
Loading…
x
Reference in New Issue
Block a user