mirror of
https://github.com/facebook/rocksdb.git
synced 2025-04-20 03:20:00 +08:00
Fix some race conditions in listener_test (#13385)
Summary: There are some data races reported for this test. This PR fixes two races: 1) Test main body and event listener callback race to access a variable `call_count_` in the test's event listener. 2) Test event listener access `ColumnFamilyData::current_` during `OnFlushCompleted` without locking DB mutex and raced with a background compaction job. Example [run](https://github.com/facebook/rocksdb/actions/runs/13208433475/job/36876956677?fbclid=IwZXh0bgNhZW0CMTEAAR0_gG1Brx7I6bhN3PVD267c2d06GSf7QBEQ8cbNcFHNvn-ZX2JWHtr05qg_aem_915NHkfFh-6cMk83uTHWKw) Pull Request resolved: https://github.com/facebook/rocksdb/pull/13385 Reviewed By: cbi42 Differential Revision: D69333371 Pulled By: jowlyzhang fbshipit-source-id: dee4a5f5e161d9b1f5b47b37163ee5b91fe18977
This commit is contained in:
parent
dd01f73e26
commit
d48af21386
@ -380,9 +380,9 @@ class ColumnFamilyData {
|
||||
return mem()->GetFirstSequenceNumber() == 0 && imm()->NumNotFlushed() == 0;
|
||||
}
|
||||
|
||||
Version* current() { return current_; }
|
||||
Version* dummy_versions() { return dummy_versions_; }
|
||||
void SetCurrent(Version* _current);
|
||||
Version* current() { return current_; } // REQUIRE: DB mutex held
|
||||
void SetCurrent(Version* _current); // REQUIRE: DB mutex held
|
||||
uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held
|
||||
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
|
||||
uint64_t GetLiveSstFilesSize() const; // REQUIRE: DB mutex held
|
||||
|
@ -1285,7 +1285,9 @@ class BlobDBJobLevelEventListenerTest : public EventListener {
|
||||
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
|
||||
EXPECT_NE(cfd, nullptr);
|
||||
|
||||
test_->dbfull()->TEST_LockMutex();
|
||||
Version* const current = cfd->current();
|
||||
test_->dbfull()->TEST_UnlockMutex();
|
||||
EXPECT_NE(current, nullptr);
|
||||
|
||||
const VersionStorageInfo* const storage_info = current->storage_info();
|
||||
@ -1325,10 +1327,9 @@ class BlobDBJobLevelEventListenerTest : public EventListener {
|
||||
}
|
||||
|
||||
void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
|
||||
call_count_++;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
IncreaseCallCount(/*mutex_locked*/ true);
|
||||
flushed_files_.push_back(info.file_path);
|
||||
}
|
||||
|
||||
@ -1339,7 +1340,7 @@ class BlobDBJobLevelEventListenerTest : public EventListener {
|
||||
|
||||
void OnCompactionCompleted(DB* /*db*/,
|
||||
const CompactionJobInfo& info) override {
|
||||
call_count_++;
|
||||
IncreaseCallCount(/*mutex_locked*/ false);
|
||||
|
||||
EXPECT_EQ(info.blob_compression_type, kNoCompression);
|
||||
|
||||
@ -1355,12 +1356,31 @@ class BlobDBJobLevelEventListenerTest : public EventListener {
|
||||
}
|
||||
}
|
||||
|
||||
void IncreaseCallCount(bool mutex_locked) {
|
||||
if (!mutex_locked) {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
call_count_++;
|
||||
} else {
|
||||
call_count_++;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t GetCallCount() {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
return call_count_;
|
||||
}
|
||||
|
||||
void ResetCallCount() {
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
call_count_ = 0;
|
||||
}
|
||||
|
||||
EventListenerTest* test_;
|
||||
uint32_t call_count_;
|
||||
|
||||
private:
|
||||
std::vector<std::string> flushed_files_;
|
||||
std::mutex mutex_;
|
||||
std::vector<std::string> flushed_files_;
|
||||
uint32_t call_count_;
|
||||
};
|
||||
|
||||
// Test OnFlushCompleted EventListener called for blob files
|
||||
@ -1389,7 +1409,7 @@ TEST_F(EventListenerTest, BlobDBOnFlushCompleted) {
|
||||
ASSERT_EQ(Get("Key2"), "blob_value2");
|
||||
ASSERT_EQ(Get("Key3"), "blob_value3");
|
||||
|
||||
ASSERT_GT(blob_event_listener->call_count_, 0U);
|
||||
ASSERT_GT(blob_event_listener->GetCallCount(), 0U);
|
||||
}
|
||||
|
||||
// Test OnCompactionCompleted EventListener called for blob files
|
||||
@ -1423,7 +1443,7 @@ TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) {
|
||||
ASSERT_OK(Put("Key6", "blob_value6"));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
blob_event_listener->call_count_ = 0;
|
||||
blob_event_listener->ResetCallCount();
|
||||
constexpr Slice* begin = nullptr;
|
||||
constexpr Slice* end = nullptr;
|
||||
|
||||
@ -1432,7 +1452,7 @@ TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) {
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
|
||||
|
||||
// Make sure, OnCompactionCompleted is called.
|
||||
ASSERT_GT(blob_event_listener->call_count_, 0U);
|
||||
ASSERT_GT(blob_event_listener->GetCallCount(), 0U);
|
||||
}
|
||||
|
||||
// Test CompactFiles calls OnCompactionCompleted EventListener for blob files
|
||||
|
Loading…
x
Reference in New Issue
Block a user