foundationdb/documentation/tutorial/tutorial.actor.cpp
Konrad `ktoso` Malawski c26aa0b2a3
Introduce initial Swift support in fdbserver (#10156)
* [fdbserver] workaround the FRT type layout issue to get Swfit getVersion working

* MasterData.actor.h: fix comment typo

* masterserver.swift: some tweaks

* masterserver.swift: remove getVersion function, use the method

* masterserver.swift: print replied version to output for tracing

* [swift] add radar links for C++ interop issues found in getVersion bringup

* Update fdbserver.actor.cpp

* Migrate MasterData closer to full reference type

This removes the workaround for the FRT type layout issue, and gets us closer to making MasterData a full reference type

* [interop] require a new toolchain (>= Oct 19th) to build

* [Swift] fix computation of toAdd for getVersion Swift implementation

* add Swift to FDBClient and add async `atLeast` to NotifiedVersion

* fix

* use new atLeast API in master server

* =build fixup link dependencies in swift fdbclient

* clocks

* +clock implement Clock using Flow's notion of time

* [interop] workaround the immortal retain/release issue

* [swift] add script to get latest centos toolchain

* always install swift hooks; not only in "test" mode

* simulator - first thing running WIP

* cleanups

* more cleanup

* working snapshot

* remove sim debug printlns

* added convenience for whenAtLeast

* try Alex's workaround

* annotate nonnull

* cleanup clock a little bit

* fix missing impls after rebase

* Undo the swift_lookup_Map_UID_CommitProxyVersionReplies workaround

No longer needed - the issue was retain/release

* [flow][swift] add Swift version of BUGGIFY

* [swiftication] add CounterValue type to provide value semantics for Counter types on the Swift side

* remove extraneous requestingProxyUID local

* masterserver: initial Swift state prototype

* [interop] make the Swiftied getVersion work

* masterserver - remove the C++ implementation (it can't be supported as state is now missing)

* Remove unnecessary SWIFT_CXX_REF_IMMORTAL annotations from Flow types

* Remove C++ implementation of CommitProxyVersionReplies - it's in Swift now

* [swift interop] remove more SWIFT_CXX_REF_IMMORTAL

* [swift interop] add SWIFT_CXX_IMMORTAL_SINGLETON_TYPE annotation for semanticly meaningful immortal uses

* rename SWIFT_CXX_REF_IMMORTAL -> UNSAFE_SWIFT_CXX_IMMORTAL_REF

* Move master server waitForPrev to swift

* =build fix linking swift in all modules

* =build single link option

* =cmake avoid manual math, just get "last" element from list

* implement Streams support (#18)

* [interop] update to new toolchain #6

* [interop] remove C++ vtable linking workarounds

* [interop] make MasterData proper reference counted SWIFT_CXX_REF_MASTERDATA

* [interop] use Swift array to pass UIDs to registerLastCommitProxyVersionReplies

* [interop] expose MasterServer actor to C++ without wrapper struct

* [interop] we no longer need expose on methods 🥳

* [interop] initial prototype of storing CheckedContinuation on the C++ side

* Example of invoking a synchronous swift function from a C++ unit test. (#21)

* move all "tests" we have in Swift, and priority support into real modules (#24)

* Make set continuation functions inline

* Split flow_swift into flow_swift and flow_swift_future to break circular dependency

* rename SwiftContinuationCallbackStruct to FlowCallbackForSwiftContinuation

* Future interop: use a method in a class template for continuation set call

* Revert "Merge pull request #22 from FoundationDB/cpp-continuation" (#30)

* Basic Swift Guide (#29)

Co-authored-by: Alex Lorenz <arphaman@gmail.com>

* Revert "Revert "Merge pull request #22 from FoundationDB/cpp-continuation" (#30)"

This reverts commit c025fe6258c4c4904d5e70cd549d408bb61f2802.

* Restore the C++ continuation, but it seems waitValue is broken for CInt somehow now

* disable broken tests - waitValue not accessible

* Streams can be async iterated over (#27)

Co-authored-by: Alex Lorenz <arphaman@gmail.com>

* remove work in progress things (#35)

* remove some not used (yet) code

* remove expose func for CInt, it's a primitive so we always have witness info (#37)

* +masterdata implement provideVersions in Swift (#36)

* serveLiveCommittedVersion in Swift (#38)

* Port updateLiveCommittedVersion to swift (#33)

Co-authored-by: Konrad `ktoso` Malawski <konrad_malawski@apple.com>

* Implement updateRecoveryData in Swift (#39)

Co-authored-by: Alex Lorenz <arphaman@gmail.com>

* Simplify flow_swift to avoid multiple targets and generate separate CheckedContinuation header

* Uncomment test which was blocked on extensions not being picked up (#31)

* [interop] Use a separate target for Swift-to-C++ header generation

* reduce boilerplate in future and stream support (#41)

* [interop] require interop v8 - that will fix linker issue (https://github.com/apple/swift/issues/62448)

* [interop] fix swift_stream_support.h Swift include

* [interop] bump up requirement to version 9

* [interop] Generalize the Flow.Optional -> Swift.Optional conversion using generics

* [WIP] masterServer func in Swift (#45)

* [interop] Try conforms_to with a SWIFT_CONFORMS_TO macro for Optional conformance (#49)

* [interop] include FlowOptionalProtocol source file when generating Flow_CheckedContinuation.h

This header generation step depends on the import of the C++ Flow module, which requires the presence of FlowOptionalProtocol

* conform Future to FlowFutureOps

* some notes

* move to value() so we can use discardable result for Flow.Void

* make calling into Swift async funcs nicer by returning Flow Futures

* [interop] hide initial use of FlowCheckedContinuation in flow.h to break dependency cycle

* [fdbserver] fix an EncryptionOpsUtils.h modularization issue (showed up with modularized libc++)

* Pass GCC toolchain using CMAKE_Swift_COMPILE_EXTERNAL_TOOLCHAIN to Swift's clang importer

* [interop] drop the no longer needed libstdc++ include directories

* [cmake] add a configuration check to ensure Swift can import C++ standard library

* [swift] include msgpack from msgpack_DIR

* [interop] make sure the FDB module maps have 'export' directive

* add import 'flow_swift' to swift_fdbserver_cxx_swift_value_conformance.swift

This is needed for CONFORMS_TO to work in imported modules

* make sure the Swift -> C++ manually bridged function signature matches generated signature

* [interop][workaround] force back use of @expose attribute before _Concurrency issue is fixed

* [interop] make getResolutionBalancer return a pointer to allow Swift to use it

We should revert back to a reference once compiler allows references again

* [interop] add a workaround for 'pop' being marked as unsafe in Swift

* masterserver.swift: MasterData returns the Swift actor pointer in an unsafe manner

* Add a 'getCopy' method to AsyncVar to make it more Swift friendly

* [interop] bump up the toolchain requirement

* Revert "[interop][workaround] force back use of @expose attribute before _Concurrency issue is fixed"

This reverts commit b01b271a76d1677bbb7c5c9c64cdad4b8b2b9612.

* [interop] add FIXME comments highlighting new issue workarounds

* [interop] adopt the new C++ interoperability compiler flag

* [interop] generate swift compile commands

* Do not deduplicate Swift compilation commands

* [interop] generate swift compile commands

* Do not deduplicate Swift compilation commands

* flow actorcompiler.h: add a SWIFT_ACTOR empty macro definition

This is needed to make the actor files parsable by clangd

* [cmake] add missing dependencies

* experimental cross compile

* [cmake] fix triple in cross-compiled cmake flags

* [interop] update to interop toolchain version 16

* [x-compile] add flags for cross-compiling boost

* cleanup x-compile cmake changes

* [cmake] fix typo in CMAKE_Swift_COMPILER_EXTERNAL_TOOLCHAIN config variable

* [interop] pass MasterDataActor from Swift to C++ and back to Swift

* [fdbserver] Swift->C++ header generation for FDBServer should use same module cache path

* Update swift_get_latest_toolchain.sh to fetch 5.9 toochains

* set HAVE_FLAG_SEARCH_PATHS_FIRST for cross compilation

* Resolve conflicts in net2/sim2/actors, can't build yet

* undo SWIFT_ACTOR changes, not necessary for merge

* guard c++ compiler flags with is_cxx_compile

* Update flow/actorcompiler/ActorParser.cs

Co-authored-by: Evan Wilde <etceterawilde@gmail.com>

* update the boost dependency

* Include boost directory from the container for Swift

* conform flow's Optional to FlowOptionalProtocol again

* Guard entire RocksDBLogForwarder.h with SSD_ROCKSDB_EXPERIMENTAL to avoid failing on missing rocksdb APIs

* remove extraneous merge marker

* [swift] update swift_test_streams.swifto to use vars in more places

* Add header guard to flow/include/flow/ThreadSafeQueue.h to fix moduralization issue

* Update net and sim impls

* [cmake] use prebuilt libc++ boost only when we're actually using libc++

* [fdbserver] Swift->C++ header generation for FDBServer should use same module cache path

* fixups after merge

* remove CustomStringConvertible conformance that would not be used

* remove self-caused deprecation warnings in future_support

* handle newly added task priority

* reformatting

* future: make value() not mutating

* remove FIXME, not needed anymore

* future: clarify why as functions

* Support TraceEvent in Swift

* Enable TraceEvent using a class wrapper in Swift

* prearing WITH_SWIFT flag

* wip disabled failing Go stuff

* cleanup WITH_SWIFT_FLAG and reenable Go

* wip disabled failing Go stuff

* move setting flag before printing it

* Add SWIFT_IDE_SETUP and cleanup guides and build a bit

* Revert "Wipe packet buffers that held serialized WipedString (#10018)"

This reverts commit e2df6e33029897360f8e11b3aea8fef97393a98c.

* [Swift] Compile workaround in KeyBackedRangeMap; default init is incorrect

* [interop] do not add FlowFutureOps conformance when building flow clang module for Flow checked continuation header pre-generation

* make sure to show  -DUSE_LIBCXX=OFF in readme

* readme updates

* do not print to stderr

* Update Swift and C++ code to build with latest Swift 5.9 toolchain now that we no longer support universal references and bridge the methods that take in a constant reference template parameter correctly

* Fix SERVER_KNOBS and enable use them for masterserver

* Bump to C++20, Swift is now able to handle it as well

* Put waitForPrev behind FLOW_WITH_SWIFT knob

* Forward declare updateLiveCommittedVersion

* Remove unused code

* fix wrong condition set for updateLiveCommittedVersion

* Revert "Revert "Wipe packet buffers that held serialized WipedString (#10018)""

This reverts commit 5ad8dce0525fb1844664ed2ccd7ba595db6913dd.

* Enable go-bindings in cmake

* Revert "Revert "Wipe packet buffers that held serialized WipedString (#10018)""

This reverts commit 5ad8dce0525fb1844664ed2ccd7ba595db6913dd.

* USE_SWIFT flag so we "build without swift" until ready to by default

* uncomment a few tests which were disabled during USE_SWIFT enablement

* the option is WITH_SWIFT, not USE

* formatting

* Fix masterserver compile error

* Fix some build errors.

How did it not merge cleanly? :/

* remove initializer list from constructor

* Expect Swift toolchain only if WITH_SWIFT is enabled

* Don't require Flow_CheckedContinuation when Swift is disabled

* Don't compile FlowCheckedContinuation when WITH_SWIFT=OFF

* No-op Swift macros

* More compile guards

* fix typo

* Run clang-format

* Guard swift/bridging include in fdbrpc

* Remove printf to pass the test

* Remove some more printf to avoid potential issues

TODO: Need to be TraceEvents instead

* Remove __has_feature(nullability) as its only used in Swift

* Don't use __FILENAME__

* Don't call generate_module_map outside WITH_SWIFT

* Add some more cmake stuff under WITH_SWIFT guard

* Some more guards

* Bring back TLSTest.cpp

* clang-format

* fix comment formatting

* Remove unused command line arg

* fix cmake formatting in some files

* Address some review comments

* fix clang-format error

---------

Co-authored-by: Alex Lorenz <arphaman@gmail.com>
Co-authored-by: Russell Sears <russell_sears@apple.com>
Co-authored-by: Evan Wilde <etceterawilde@gmail.com>
Co-authored-by: Alex Lorenz <aleksei_lorenz@apple.com>
Co-authored-by: Vishesh Yadav <vishesh_yadav@apple.com>
Co-authored-by: Vishesh Yadav <vishesh3y@gmail.com>
2023-06-02 16:09:28 -05:00

607 lines
17 KiB
C++

/*
* tutorial.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fmt/format.h"
#include "flow/flow.h"
#include "flow/Platform.h"
#include "flow/DeterministicRandom.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "flow/TLSConfig.actor.h"
#include <functional>
#include <unordered_map>
#include <memory>
#include <iostream>
#include "flow/actorcompiler.h"
NetworkAddress serverAddress;
enum TutorialWellKnownEndpoints {
WLTOKEN_SIMPLE_KV_SERVER = WLTOKEN_FIRST_AVAILABLE,
WLTOKEN_ECHO_SERVER,
WLTOKEN_COUNT_IN_TUTORIAL
};
// this is a simple actor that will report how long
// it is already running once a second.
ACTOR Future<Void> simpleTimer() {
// we need to remember the time when we first
// started.
// This needs to be a state-variable because
// we will use it in different parts of the
// actor. If you don't understand how state
// variables work, it is a good idea to remove
// the state keyword here and look at the
// generated C++ code from the actor compiler.
state double start_time = g_network->now();
loop {
wait(delay(1.0));
std::cout << format("Time: %.2f\n", g_network->now() - start_time);
}
}
// A actor that demonstrates how choose-when blocks work.
ACTOR Future<Void> someFuture(Future<int> ready) {
// loop choose {} works as well here - the braces are optional
loop choose {
when(wait(delay(0.5))) {
std::cout << "Still waiting...\n";
}
when(int r = wait(ready)) {
std::cout << format("Ready %d\n", r);
wait(delay(double(r)));
std::cout << "Done\n";
return Void();
}
}
}
ACTOR Future<Void> promiseDemo() {
state Promise<int> promise;
state Future<Void> f = someFuture(promise.getFuture());
wait(delay(3.0));
promise.send(2);
wait(f);
return Void();
}
ACTOR Future<Void> eventLoop(AsyncTrigger* trigger) {
loop choose {
when(wait(delay(0.5))) {
std::cout << "Still waiting...\n";
}
when(wait(trigger->onTrigger())) {
std::cout << "Triggered!\n";
}
}
}
ACTOR Future<Void> triggerDemo() {
state int runs = 1;
state AsyncTrigger trigger;
state Future<Void> triggerLoop = eventLoop(&trigger);
while (++runs < 10) {
wait(delay(1.0));
std::cout << "trigger..";
trigger.trigger();
}
std::cout << "Done.";
return Void();
}
struct EchoServerInterface {
constexpr static FileIdentifier file_identifier = 3152015;
RequestStream<struct GetInterfaceRequest> getInterface;
RequestStream<struct EchoRequest> echo;
RequestStream<struct ReverseRequest> reverse;
RequestStream<struct StreamRequest> stream;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, echo, reverse, stream);
}
};
struct GetInterfaceRequest {
constexpr static FileIdentifier file_identifier = 12004156;
ReplyPromise<EchoServerInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct EchoRequest {
constexpr static FileIdentifier file_identifier = 10624019;
std::string message;
// this variable has to be called reply!
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, message, reply);
}
};
struct ReverseRequest {
constexpr static FileIdentifier file_identifier = 10765955;
std::string message;
// this variable has to be called reply!
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, message, reply);
}
};
struct StreamReply : ReplyPromiseStreamReply {
constexpr static FileIdentifier file_identifier = 440804;
int index = 0;
StreamReply() = default;
explicit StreamReply(int index) : index(index) {}
size_t expectedSize() const { return 2e6; }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, ReplyPromiseStreamReply::acknowledgeToken, ReplyPromiseStreamReply::sequence, index);
}
};
struct StreamRequest {
constexpr static FileIdentifier file_identifier = 5410805;
ReplyPromiseStream<StreamReply> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
uint64_t tokenCounter = 1;
ACTOR Future<Void> echoServer() {
state EchoServerInterface echoServer;
echoServer.getInterface.makeWellKnownEndpoint(WLTOKEN_ECHO_SERVER, TaskPriority::DefaultEndpoint);
loop {
try {
choose {
when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) {
req.reply.send(echoServer);
}
when(EchoRequest req = waitNext(echoServer.echo.getFuture())) {
req.reply.send(req.message);
}
when(ReverseRequest req = waitNext(echoServer.reverse.getFuture())) {
req.reply.send(std::string(req.message.rbegin(), req.message.rend()));
}
when(state StreamRequest req = waitNext(echoServer.stream.getFuture())) {
req.reply.setByteLimit(1024);
state int i = 0;
for (; i < 100; ++i) {
wait(req.reply.onReady());
std::cout << "Send " << i << std::endl;
req.reply.send(StreamReply{ i });
}
req.reply.sendError(end_of_stream());
}
}
} catch (Error& e) {
if (e.code() != error_code_operation_obsolete) {
fprintf(stderr, "Error: %s\n", e.what());
throw e;
}
}
}
}
ACTOR Future<Void> echoClient() {
state EchoServerInterface server;
server.getInterface =
RequestStream<GetInterfaceRequest>(Endpoint::wellKnown({ serverAddress }, WLTOKEN_ECHO_SERVER));
EchoServerInterface s = wait(server.getInterface.getReply(GetInterfaceRequest()));
server = s;
EchoRequest echoRequest;
echoRequest.message = "Hello World";
std::string echoMessage = wait(server.echo.getReply(echoRequest));
std::cout << format("Sent %s to echo, received %s\n", "Hello World", echoMessage.c_str());
ReverseRequest reverseRequest;
reverseRequest.message = "Hello World";
std::string reverseString = wait(server.reverse.getReply(reverseRequest));
std::cout << format("Sent %s to reverse, received %s\n", "Hello World", reverseString.c_str());
state ReplyPromiseStream<StreamReply> stream = server.stream.getReplyStream(StreamRequest{});
state int j = 0;
try {
loop {
StreamReply rep = waitNext(stream.getFuture());
std::cout << "Rep: " << rep.index << std::endl;
ASSERT(rep.index == j++);
}
} catch (Error& e) {
ASSERT(e.code() == error_code_end_of_stream || e.code() == error_code_connection_failed);
}
return Void();
}
struct SimpleKeyValueStoreInterface {
constexpr static FileIdentifier file_identifier = 8226647;
RequestStream<struct GetKVInterface> connect;
RequestStream<struct GetRequest> get;
RequestStream<struct SetRequest> set;
RequestStream<struct ClearRequest> clear;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, connect, get, set, clear);
}
};
struct GetKVInterface {
constexpr static FileIdentifier file_identifier = 8062308;
ReplyPromise<SimpleKeyValueStoreInterface> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reply);
}
};
struct GetRequest {
constexpr static FileIdentifier file_identifier = 6983506;
std::string key;
ReplyPromise<std::string> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, reply);
}
};
struct SetRequest {
constexpr static FileIdentifier file_identifier = 7554186;
std::string key;
std::string value;
ReplyPromise<Void> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, key, value, reply);
}
};
struct ClearRequest {
constexpr static FileIdentifier file_identifier = 8500026;
std::string from;
std::string to;
ReplyPromise<Void> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, from, to, reply);
}
};
ACTOR Future<Void> kvStoreServer() {
state SimpleKeyValueStoreInterface inf;
state std::map<std::string, std::string> store;
inf.connect.makeWellKnownEndpoint(WLTOKEN_SIMPLE_KV_SERVER, TaskPriority::DefaultEndpoint);
loop {
choose {
when(GetKVInterface req = waitNext(inf.connect.getFuture())) {
std::cout << "Received connection attempt\n";
req.reply.send(inf);
}
when(GetRequest req = waitNext(inf.get.getFuture())) {
auto iter = store.find(req.key);
if (iter == store.end()) {
req.reply.sendError(io_error());
} else {
req.reply.send(iter->second);
}
}
when(SetRequest req = waitNext(inf.set.getFuture())) {
store[req.key] = req.value;
req.reply.send(Void());
}
when(ClearRequest req = waitNext(inf.clear.getFuture())) {
auto from = store.lower_bound(req.from);
auto to = store.lower_bound(req.to);
while (from != store.end() && from != to) {
auto next = from;
++next;
store.erase(from);
from = next;
}
req.reply.send(Void());
}
}
}
}
ACTOR Future<SimpleKeyValueStoreInterface> connect() {
std::cout << format("%llu: Connect...\n", uint64_t(g_network->now()));
SimpleKeyValueStoreInterface c;
c.connect = RequestStream<GetKVInterface>(Endpoint::wellKnown({ serverAddress }, WLTOKEN_SIMPLE_KV_SERVER));
SimpleKeyValueStoreInterface result = wait(c.connect.getReply(GetKVInterface()));
std::cout << format("%llu: done..\n", uint64_t(g_network->now()));
return result;
}
ACTOR Future<Void> kvSimpleClient() {
state SimpleKeyValueStoreInterface server = wait(connect());
std::cout << format("Set %s -> %s\n", "foo", "bar");
SetRequest setRequest;
setRequest.key = "foo";
setRequest.value = "bar";
wait(server.set.getReply(setRequest));
GetRequest getRequest;
getRequest.key = "foo";
std::string value = wait(server.get.getReply(getRequest));
std::cout << format("get(%s) -> %s\n", "foo", value.c_str());
return Void();
}
ACTOR Future<Void> kvClient(SimpleKeyValueStoreInterface server, std::shared_ptr<uint64_t> ops) {
state Future<Void> timeout = delay(20);
state int rangeSize = 2 << 12;
loop {
SetRequest setRequest;
setRequest.key = std::to_string(deterministicRandom()->randomInt(0, rangeSize));
setRequest.value = "foo";
wait(server.set.getReply(setRequest));
++(*ops);
try {
GetRequest getRequest;
getRequest.key = std::to_string(deterministicRandom()->randomInt(0, rangeSize));
std::string _ = wait(server.get.getReply(getRequest));
++(*ops);
} catch (Error& e) {
if (e.code() != error_code_io_error) {
throw e;
}
}
int from = deterministicRandom()->randomInt(0, rangeSize);
ClearRequest clearRequest;
clearRequest.from = std::to_string(from);
clearRequest.to = std::to_string(from + 100);
wait(server.clear.getReply(clearRequest));
++(*ops);
if (timeout.isReady()) {
// we are done
return Void();
}
}
}
ACTOR Future<Void> throughputMeasurement(std::shared_ptr<uint64_t> operations) {
loop {
wait(delay(1.0));
std::cout << format("%llu op/s\n", *operations);
*operations = 0;
}
}
ACTOR Future<Void> multipleClients() {
SimpleKeyValueStoreInterface server = wait(connect());
auto ops = std::make_shared<uint64_t>(0);
std::vector<Future<Void>> clients(100);
for (auto& f : clients) {
f = kvClient(server, ops);
}
auto done = waitForAll(clients);
wait(done || throughputMeasurement(ops));
return Void();
}
std::string clusterFile = "fdb.cluster";
ACTOR Future<Void> logThroughput(int64_t* v, Key* next) {
loop {
state int64_t last = *v;
wait(delay(1));
fmt::print("throughput: {} bytes/s, next: {}\n", *v - last, printable(*next).c_str());
}
}
ACTOR Future<Void> fdbClientStream() {
state Database db = Database::createDatabase(clusterFile, 300);
state Transaction tx(db);
state Key next;
state int64_t bytes = 0;
state Future<Void> logFuture = logThroughput(&bytes, &next);
loop {
state PromiseStream<Standalone<RangeResultRef>> results;
try {
state Future<Void> stream = tx.getRangeStream(results,
KeySelector(firstGreaterOrEqual(next), next.arena()),
KeySelector(firstGreaterOrEqual(normalKeys.end)),
GetRangeLimits());
loop {
Standalone<RangeResultRef> range = waitNext(results.getFuture());
if (range.size()) {
bytes += range.expectedSize();
next = keyAfter(range.back().key);
}
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
break;
}
wait(tx.onError(e));
}
}
return Void();
}
ACTOR Future<Void> fdbClientGetRange() {
state Database db = Database::createDatabase(clusterFile, 300);
state Transaction tx(db);
state Key next;
state int64_t bytes = 0;
state Future<Void> logFuture = logThroughput(&bytes, &next);
loop {
try {
Standalone<RangeResultRef> range =
wait(tx.getRange(KeySelector(firstGreaterOrEqual(next), next.arena()),
KeySelector(firstGreaterOrEqual(normalKeys.end)),
GetRangeLimits(GetRangeLimits::ROW_LIMIT_UNLIMITED, CLIENT_KNOBS->REPLY_BYTE_LIMIT)));
bytes += range.expectedSize();
if (!range.more) {
break;
}
next = keyAfter(range.back().key);
} catch (Error& e) {
wait(tx.onError(e));
}
}
return Void();
}
ACTOR Future<Void> fdbClient() {
wait(delay(30));
state Database db = Database::createDatabase(clusterFile, 300);
state Transaction tx(db);
state std::string keyPrefix = "/tut/";
state Key startKey;
state KeyRef endKey = "/tut0"_sr;
state int beginIdx = 0;
loop {
try {
tx.reset();
// this workload is stupidly simple:
// 1. select a random key between 1
// and 1e8
// 2. select this key plus the 100
// next ones
// 3. write 10 values in [k, k+100]
beginIdx = deterministicRandom()->randomInt(0, 1e8 - 100);
startKey = keyPrefix + std::to_string(beginIdx);
RangeResult range = wait(tx.getRange(KeyRangeRef(startKey, endKey), 100));
for (int i = 0; i < 10; ++i) {
Key k = Key(keyPrefix + std::to_string(beginIdx + deterministicRandom()->randomInt(0, 100)));
tx.set(k, "foo"_sr);
}
wait(tx.commit());
std::cout << "Committed\n";
wait(delay(2.0));
} catch (Error& e) {
wait(tx.onError(e));
}
}
}
ACTOR Future<Void> fdbStatusStresser() {
state Database db = Database::createDatabase(clusterFile, 300);
state ReadYourWritesTransaction tx(db);
state Key statusJson(std::string("\xff\xff/status/json"));
loop {
try {
tx.reset();
Optional<Value> _ = wait(tx.get(statusJson));
} catch (Error& e) {
wait(tx.onError(e));
}
}
}
std::unordered_map<std::string, std::function<Future<Void>()>> actors = {
{ "timer", &simpleTimer }, // ./tutorial timer
{ "promiseDemo", &promiseDemo }, // ./tutorial promiseDemo
{ "triggerDemo", &triggerDemo }, // ./tutorial triggerDemo
{ "echoServer", &echoServer }, // ./tutorial -p 6666 echoServer
{ "echoClient", &echoClient }, // ./tutorial -s 127.0.0.1:6666 echoClient
{ "kvStoreServer", &kvStoreServer }, // ./tutorial -p 6666 kvStoreServer
{ "kvSimpleClient", &kvSimpleClient }, // ./tutorial -s 127.0.0.1:6666 kvSimpleClient
{ "multipleClients", &multipleClients }, // ./tutorial -s 127.0.0.1:6666 multipleClients
{ "fdbClientStream", &fdbClientStream }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClientStream
{ "fdbClientGetRange", &fdbClientGetRange }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClientGetRange
{ "fdbClient", &fdbClient }, // ./tutorial -C $CLUSTER_FILE_PATH fdbClient
{ "fdbStatusStresser", &fdbStatusStresser }
}; // ./tutorial -C $CLUSTER_FILE_PATH fdbStatusStresser
int main(int argc, char* argv[]) {
bool isServer = false;
std::string port;
std::vector<std::function<Future<Void>()>> toRun;
// parse arguments
for (int i = 1; i < argc; ++i) {
std::string arg(argv[i]);
if (arg == "-p") {
isServer = true;
if (i + 1 >= argc) {
std::cout << "Expecting an argument after -p\n";
return 1;
}
port = std::string(argv[++i]);
continue;
} else if (arg == "-s") {
if (i + 1 >= argc) {
std::cout << "Expecting an argument after -s\n";
return 1;
}
serverAddress = NetworkAddress::parse(argv[++i]);
continue;
} else if (arg == "-C") {
clusterFile = argv[++i];
std::cout << "Using cluster file " << clusterFile << std::endl;
continue;
}
auto actor = actors.find(arg);
if (actor == actors.end()) {
std::cout << format("Error: actor %s does not exist\n", arg.c_str());
return 1;
}
toRun.push_back(actor->second);
}
platformInit();
g_network = newNet2(TLSConfig(), false, true);
FlowTransport::createInstance(!isServer, 0, WLTOKEN_COUNT_IN_TUTORIAL);
NetworkAddress publicAddress = NetworkAddress::parse("0.0.0.0:0");
if (isServer) {
publicAddress = NetworkAddress::parse("0.0.0.0:" + port);
}
// openTraceFile(publicAddress, TRACE_DEFAULT_ROLL_SIZE,
// TRACE_DEFAULT_MAX_LOGS_SIZE);
try {
if (isServer) {
auto listenError = FlowTransport::transport().bind(publicAddress, publicAddress);
if (listenError.isError()) {
listenError.get();
}
}
} catch (Error& e) {
std::cout << format("Error while binding to address (%d): %s\n", e.code(), e.what());
}
// now we start the actors
std::vector<Future<Void>> all;
all.reserve(toRun.size());
for (auto& f : toRun) {
all.emplace_back(f());
}
auto f = stopAfter(waitForAll(all));
g_network->run();
return 0;
}