fix: The destructor of a parallel stream fragment could be called after the parallel stream was destroyed, by making parallelStream reference counted that problem is avoided

This commit is contained in:
Evan Tschannen 2021-06-22 17:30:51 -07:00
parent 00469b7e2d
commit afaa66ba47
3 changed files with 14 additions and 12 deletions

View File

@ -3727,7 +3727,8 @@ ACTOR Future<Void> getRangeStream(PromiseStream<RangeResult> _results,
TransactionInfo info,
TagSet tags) {
state ParallelStream<RangeResult> results(_results, CLIENT_KNOBS->RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT);
state Reference<ParallelStream<RangeResult>> results =
makeReference<ParallelStream<RangeResult>>(_results, CLIENT_KNOBS->RANGESTREAM_BUFFERED_FRAGMENTS_LIMIT);
// FIXME: better handling to disable row limits
ASSERT(!limits.hasRowLimit());
@ -3749,7 +3750,7 @@ ACTOR Future<Void> getRangeStream(PromiseStream<RangeResult> _results,
}
if (b >= e) {
wait(results.finish());
wait(results->finish());
return Void();
}
@ -3784,7 +3785,7 @@ ACTOR Future<Void> getRangeStream(PromiseStream<RangeResult> _results,
if (toSend[useIdx].empty()) {
continue;
}
ParallelStream<RangeResult>::Fragment* fragment = wait(results.createFragment());
ParallelStream<RangeResult>::Fragment* fragment = wait(results->createFragment());
outstandingRequests.push_back(getRangeStreamFragment(
fragment, cx, trLogInfo, version, toSend[useIdx], limits, snapshot, reverse, info, tags, span.context));
}
@ -3794,7 +3795,7 @@ ACTOR Future<Void> getRangeStream(PromiseStream<RangeResult> _results,
b = shardIntersection.end;
}
}
wait(waitForAll(outstandingRequests) && results.finish());
wait(waitForAll(outstandingRequests) && results->finish());
return Void();
}

View File

@ -61,7 +61,8 @@ TEST_CASE("/fdbclient/ParallelStream") {
state PromiseStream<ParallelStreamTest::TestValue> results;
state size_t bufferLimit = deterministicRandom()->randomInt(0, 21);
state size_t numProducers = deterministicRandom()->randomInt(1, 1001);
state ParallelStream<ParallelStreamTest::TestValue> parallelStream(results, bufferLimit);
state Reference<ParallelStream<ParallelStreamTest::TestValue>> parallelStream =
makeReference<ParallelStream<ParallelStreamTest::TestValue>>(results, bufferLimit);
state Future<Void> consumer = ParallelStreamTest::consume(results.getFuture(), numProducers);
state std::vector<Future<Void>> producers;
TraceEvent("StartingParallelStreamTest")
@ -69,10 +70,10 @@ TEST_CASE("/fdbclient/ParallelStream") {
.detail("NumProducers", numProducers);
state int i = 0;
for (; i < numProducers; ++i) {
ParallelStream<ParallelStreamTest::TestValue>::Fragment* fragment = wait(parallelStream.createFragment());
ParallelStream<ParallelStreamTest::TestValue>::Fragment* fragment = wait(parallelStream->createFragment());
producers.push_back(ParallelStreamTest::produce(fragment, ParallelStreamTest::TestValue(i)));
}
wait(parallelStream.finish());
wait(parallelStream->finish());
wait(consumer);
return Void();
}

View File

@ -34,7 +34,7 @@
// ParallelStream is used to fetch data from multiple streams in parallel and then merge them back into a single stream
// in order.
template <class T>
class ParallelStream {
class ParallelStream : public ReferenceCounted<ParallelStream<T>> {
BoundedFlowLock semaphore;
struct FragmentConstructorTag {
explicit FragmentConstructorTag() = default;
@ -43,13 +43,13 @@ class ParallelStream {
public:
// A Fragment is a single stream that will get results to be merged back into the main output stream
class Fragment : public ReferenceCounted<Fragment> {
ParallelStream* parallelStream;
Reference<ParallelStream<T>> parallelStream;
PromiseStream<T> stream;
BoundedFlowLock::Releaser releaser;
friend class ParallelStream;
public:
Fragment(ParallelStream* parallelStream, int64_t permitNumber, FragmentConstructorTag)
Fragment(Reference<ParallelStream<T>> parallelStream, int64_t permitNumber, FragmentConstructorTag)
: parallelStream(parallelStream), releaser(&parallelStream->semaphore, permitNumber) {}
template <class U>
void send(U&& value) {
@ -110,14 +110,14 @@ public:
}
// Creates a fragment to get merged into the main output stream
ACTOR static Future<Fragment*> createFragmentImpl(ParallelStream<T>* self) {
ACTOR static Future<Fragment*> createFragmentImpl(Reference<ParallelStream<T>> self) {
int64_t permitNumber = wait(self->semaphore.take());
auto fragment = makeReference<Fragment>(self, permitNumber, FragmentConstructorTag());
self->fragments.send(fragment);
return fragment.getPtr();
}
Future<Fragment*> createFragment() { return createFragmentImpl(this); }
Future<Fragment*> createFragment() { return createFragmentImpl(Reference<ParallelStream<T>>::addRef(this)); }
Future<Void> finish() {
fragments.sendError(end_of_stream());