/* * MultiVersionAssignmentVars.h * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef FDBCLIENT_MULTIVERSIONASSIGNMENTVARS_H #define FDBCLIENT_MULTIVERSIONASSIGNMENTVARS_H #pragma once #include "flow/ThreadHelper.actor.h" template class AbortableSingleAssignmentVar final : public ThreadSingleAssignmentVar, public ThreadCallback { public: AbortableSingleAssignmentVar(ThreadFuture future, ThreadFuture abortSignal) : future(future), abortSignal(abortSignal), hasBeenSet(false), callbacksCleared(false) { int userParam; ThreadSingleAssignmentVar::addref(); ThreadSingleAssignmentVar::addref(); // abortSignal comes first, because otherwise future could immediately call fire/error and attempt to remove // this callback from abortSignal prematurely abortSignal.callOrSetAsCallback(this, userParam, 0); future.callOrSetAsCallback(this, userParam, 0); } void cancel() override { cancelCallbacks(); ThreadSingleAssignmentVar::cancel(); } void cleanupUnsafe() override { future.getPtr()->releaseMemory(); ThreadSingleAssignmentVar::cleanupUnsafe(); } bool canFire(int notMadeActive) const override { return true; } void fire(const Void& unused, int& userParam) override { lock.enter(); if (!hasBeenSet) { hasBeenSet = true; lock.leave(); if (future.isReady() && !future.isError()) { ThreadSingleAssignmentVar::send(future.get()); } else if (abortSignal.isReady()) { ThreadSingleAssignmentVar::sendError(cluster_version_changed()); } else { ASSERT(false); } } else { lock.leave(); } cancelCallbacks(); ThreadSingleAssignmentVar::delref(); } void error(const Error& e, int& userParam) override { ASSERT(future.isError()); lock.enter(); if (!hasBeenSet) { hasBeenSet = true; lock.leave(); ThreadSingleAssignmentVar::sendError(future.getError()); } else { lock.leave(); } cancelCallbacks(); ThreadSingleAssignmentVar::delref(); } private: ThreadFuture future; ThreadFuture abortSignal; ThreadSpinLock lock; bool hasBeenSet; bool callbacksCleared; void cancelCallbacks() { lock.enter(); if (!callbacksCleared) { callbacksCleared = true; lock.leave(); future.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this // callback gets destroyed future.getPtr()->cancel(); if (abortSignal.clearCallback(this)) { ThreadSingleAssignmentVar::delref(); } } else { lock.leave(); } } }; template ThreadFuture abortableFuture(ThreadFuture f, ThreadFuture abortSignal) { return ThreadFuture(new AbortableSingleAssignmentVar(f, abortSignal)); } template class DLThreadSingleAssignmentVar final : public ThreadSingleAssignmentVar { public: DLThreadSingleAssignmentVar(Reference api, FdbCApi::FDBFuture* f, std::function extractValue) : api(api), f(f), extractValue(extractValue), futureRefCount(1) { ThreadSingleAssignmentVar::addref(); api->futureSetCallback(f, &futureCallback, this); } ~DLThreadSingleAssignmentVar() override { lock.assertNotEntered(); if (f) { ASSERT_ABORT(futureRefCount == 1); api->futureDestroy(f); } } bool addFutureRef() { lock.enter(); bool destroyed = futureRefCount == 0; if (!destroyed) { ++futureRefCount; } lock.leave(); return !destroyed; } bool delFutureRef() { lock.enter(); if (futureRefCount == 0) { lock.leave(); return true; } bool destroyNow = (--futureRefCount == 0); lock.leave(); if (destroyNow) { api->futureDestroy(f); f = nullptr; } return destroyNow; } void cancel() override { if (addFutureRef()) { api->futureCancel(f); delFutureRef(); } ThreadSingleAssignmentVar::cancel(); } void cleanupUnsafe() override { delFutureRef(); ThreadSingleAssignmentVar::cleanupUnsafe(); } void apply() { FdbCApi::fdb_error_t error = addFutureRef() ? api->futureGetError(f) : error_code_operation_cancelled; if (error != 0) { delFutureRef(); ThreadSingleAssignmentVar::sendError(Error(error)); } else { T val = extractValue(f, api.getPtr()); delFutureRef(); ThreadSingleAssignmentVar::send(val); } ThreadSingleAssignmentVar::delref(); } static void futureCallback(FdbCApi::FDBFuture* f, void* param) { auto sav = (DLThreadSingleAssignmentVar*)param; if (MultiVersionApi::api->callbackOnMainThread) { onMainThreadVoid([sav]() { sav->apply(); }, nullptr); } else { sav->apply(); } } private: const Reference api; FdbCApi::FDBFuture* f; const std::function extractValue; ThreadSpinLock lock; int futureRefCount; }; template ThreadFuture toThreadFuture(Reference api, FdbCApi::FDBFuture* f, std::function extractValue) { return ThreadFuture(new DLThreadSingleAssignmentVar(api, f, extractValue)); } template class MapSingleAssignmentVar final : public ThreadSingleAssignmentVar, ThreadCallback { public: MapSingleAssignmentVar(ThreadFuture source, std::function(ErrorOr)> mapValue) : source(source), mapValue(mapValue) { ThreadSingleAssignmentVar::addref(); int userParam; source.callOrSetAsCallback(this, userParam, 0); } void cancel() override { source.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this callback // gets destroyed source.getPtr()->cancel(); ThreadSingleAssignmentVar::cancel(); } void cleanupUnsafe() override { source.getPtr()->releaseMemory(); ThreadSingleAssignmentVar::cleanupUnsafe(); } bool canFire(int notMadeActive) const override { return true; } void fire(const Void& unused, int& userParam) override { sendResult(mapValue(source.get())); ThreadSingleAssignmentVar::delref(); } void error(const Error& e, int& userParam) override { sendResult(mapValue(source.getError())); ThreadSingleAssignmentVar::delref(); } private: ThreadFuture source; const std::function(ErrorOr)> mapValue; void sendResult(ErrorOr result) { if (result.isError()) { ThreadSingleAssignmentVar::sendError(result.getError()); } else { ThreadSingleAssignmentVar::send(result.get()); } } }; template ThreadFuture mapThreadFuture(ThreadFuture source, std::function(ErrorOr)> mapValue) { return ThreadFuture(new MapSingleAssignmentVar(source, mapValue)); } template class FlatMapSingleAssignmentVar final : public ThreadSingleAssignmentVar, ThreadCallback { public: FlatMapSingleAssignmentVar(ThreadFuture source, std::function>(ErrorOr)> mapValue) : source(source), mapValue(mapValue), cancelled(false), released(false) { ThreadSingleAssignmentVar::addref(); int userParam; source.callOrSetAsCallback(this, userParam, 0); } void cancel() override { source.getPtr()->addref(); // Cancel will delref our future, but we don't want to destroy it until this callback // gets destroyed source.getPtr()->cancel(); lock.enter(); cancelled = true; if (mappedFuture.isValid()) { lock.leave(); mappedFuture.getPtr()->addref(); mappedFuture.getPtr()->cancel(); } else { lock.leave(); } ThreadSingleAssignmentVar::cancel(); } void cleanupUnsafe() override { source.getPtr()->releaseMemory(); lock.enter(); released = true; if (mappedFuture.isValid()) { lock.leave(); mappedFuture.getPtr()->releaseMemory(); } else { lock.leave(); } ThreadSingleAssignmentVar::cleanupUnsafe(); } bool canFire(int notMadeActive) const override { return true; } void fire(const Void& unused, int& userParam) override { if (mappedFuture.isValid()) { sendResult(mappedFuture.get()); } else { setMappedFuture(mapValue(source.get())); } ThreadSingleAssignmentVar::delref(); } void error(const Error& e, int& userParam) override { if (mappedFuture.isValid()) { sendResult(mappedFuture.getError()); } else { setMappedFuture(mapValue(source.getError())); } ThreadSingleAssignmentVar::delref(); } private: ThreadFuture source; ThreadFuture mappedFuture; bool cancelled; bool released; const std::function>(ErrorOr)> mapValue; ThreadSpinLock lock; void setMappedFuture(ErrorOr> f) { if (f.isError()) { sendResult(f.getError()); } else { lock.enter(); mappedFuture = f.get(); bool doCancel = cancelled; bool doRelease = released; lock.leave(); if (doCancel) { mappedFuture.getPtr()->addref(); mappedFuture.getPtr()->cancel(); } if (doRelease) { mappedFuture.getPtr()->releaseMemory(); } int userParam; ThreadSingleAssignmentVar::addref(); mappedFuture.callOrSetAsCallback(this, userParam, 0); } } void sendResult(ErrorOr result) { if (result.isError()) { ThreadSingleAssignmentVar::sendError(result.getError()); } else { ThreadSingleAssignmentVar::send(result.get()); } } }; template ThreadFuture flatMapThreadFuture(ThreadFuture source, std::function>(ErrorOr)> mapValue) { return ThreadFuture(new FlatMapSingleAssignmentVar(source, mapValue)); } #endif