From 2322571df22623b0bf63764b5f902c8519e3e187 Mon Sep 17 00:00:00 2001 From: mpilman Date: Wed, 7 Aug 2019 08:28:14 -0700 Subject: [PATCH] Ported flow tutorial to fdb 6 --- documentation/CMakeLists.txt | 1 + documentation/tutorial/CMakeLists.txt | 4 + documentation/tutorial/tutorial.actor.cpp | 470 ++++++++++++++++++++++ 3 files changed, 475 insertions(+) create mode 100644 documentation/tutorial/CMakeLists.txt create mode 100644 documentation/tutorial/tutorial.actor.cpp diff --git a/documentation/CMakeLists.txt b/documentation/CMakeLists.txt index ba4b299433..83fabf20ba 100644 --- a/documentation/CMakeLists.txt +++ b/documentation/CMakeLists.txt @@ -1,3 +1,4 @@ +add_subdirectory(tutorial) # build a virtualenv set(sphinx_dir ${CMAKE_CURRENT_SOURCE_DIR}/sphinx) set(venv_dir ${CMAKE_CURRENT_BINARY_DIR}/venv) diff --git a/documentation/tutorial/CMakeLists.txt b/documentation/tutorial/CMakeLists.txt new file mode 100644 index 0000000000..5c5e181625 --- /dev/null +++ b/documentation/tutorial/CMakeLists.txt @@ -0,0 +1,4 @@ +set(TUTORIAL_SRCS tutorial.actor.cpp) + +add_flow_target(EXECUTABLE NAME tutorial SRCS "${TUTORIAL_SRCS}") +target_link_libraries(tutorial PUBLIC fdbclient) diff --git a/documentation/tutorial/tutorial.actor.cpp b/documentation/tutorial/tutorial.actor.cpp new file mode 100644 index 0000000000..bf1d5c58b0 --- /dev/null +++ b/documentation/tutorial/tutorial.actor.cpp @@ -0,0 +1,470 @@ +/* + * fdbcli.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flow/flow.h" +#include "flow/Platform.h" +#include "flow/DeterministicRandom.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbclient/ReadYourWrites.h" +#include +#include +#include +#include +#include "flow/actorcompiler.h" + +NetworkAddress serverAddress; + +// this is a simple actor that will report how long +// it is already running once a second. +ACTOR Future simpleTimer() { + // we need to remember the time when we first + // started. + // This needs to be a state-variable because + // we will use it in different parts of the + // actor. If you don't understand how state + // variables work, it is a good idea to remove + // the state keyword here and look at the + // generated C++ code from the actor compiler. + state double start_time = g_network->now(); + loop { + wait(delay(1.0)); + std::cout << format("Time: %.2f\n", g_network->now() - start_time); + } +} + +// A actor that demonstrates how choose-when +// blocks work. +ACTOR Future someFuture(Future ready) { + loop { + choose { + when(wait(delay(0.5))) { std::cout << "Still waiting...\n"; } + when(int r = wait(ready)) { + std::cout << format("Ready %d\n", r); + wait(delay(double(r))); + std::cout << "Done\n"; + return Void(); + } + } + } +} + +ACTOR Future promiseDemo() { + state Promise promise; + state Future f = someFuture(promise.getFuture()); + wait(delay(3.0)); + promise.send(2); + wait(f); + return Void(); +} + +ACTOR Future eventLoop(AsyncTrigger* trigger) { + loop { + + choose { + when(wait(delay(0.5))) { std::cout << "Still waiting...\n"; } + when(wait(trigger->onTrigger())) { std::cout << "Triggered!\n"; } + } + } +} + +ACTOR Future triggerDemo() { + state int runs = 1; + state AsyncTrigger trigger; + state Future triggerLoop = eventLoop(&trigger); + while (++runs < 10) { + wait(delay(1.0)); + std::cout << "trigger.."; + trigger.trigger(); + } + std::cout << "Done."; + return Void(); +} + +struct EchoServerInterface { + constexpr static FileIdentifier file_identifier = 3152015; + RequestStream getInterface; + RequestStream echo; + RequestStream reverse; + + template + void serialize(Ar& ar) { + serializer(ar, echo, reverse); + } +}; + +struct GetInterfaceRequest { + constexpr static FileIdentifier file_identifier = 12004156; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +struct EchoRequest { + constexpr static FileIdentifier file_identifier = 10624019; + std::string message; + // this variable has to be called reply! + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, message, reply); + } +}; + +struct ReverseRequest { + constexpr static FileIdentifier file_identifier = 10765955; + std::string message; + // this variable has to be called reply! + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, message, reply); + } +}; + +uint64_t tokenCounter = 1; + +ACTOR Future echoServer() { + state EchoServerInterface echoServer; + echoServer.getInterface.makeWellKnownEndpoint(UID(-1, ++tokenCounter), TaskPriority::DefaultEndpoint); + loop { + choose { + when(GetInterfaceRequest req = waitNext(echoServer.getInterface.getFuture())) { + req.reply.send(echoServer); + } + when(EchoRequest req = waitNext(echoServer.echo.getFuture())) { req.reply.send(req.message); } + when(ReverseRequest req = waitNext(echoServer.reverse.getFuture())) { + req.reply.send(std::string(req.message.rbegin(), req.message.rend())); + } + } + } +} + +ACTOR Future echoClient() { + state EchoServerInterface server; + server.getInterface = RequestStream(Endpoint({ serverAddress }, UID(-1, ++tokenCounter))); + EchoServerInterface s = wait(server.getInterface.getReply(GetInterfaceRequest())); + server = s; + EchoRequest echoRequest; + echoRequest.message = "Hello World"; + std::string echoMessage = wait(server.echo.getReply(echoRequest)); + std::cout << format("Sent {} to echo, received %s\n", "Hello World", echoMessage.c_str()); + ReverseRequest reverseRequest; + reverseRequest.message = "Hello World"; + std::string reverseString = wait(server.reverse.getReply(reverseRequest)); + std::cout << format("Sent {} to reverse, received {}\n", "Hello World", reverseString.c_str()); + return Void(); +} + +struct SimpleKeyValueStoreInteface { + constexpr static FileIdentifier file_identifier = 8226647; + RequestStream connect; + RequestStream get; + RequestStream set; + RequestStream clear; + + template + void serialize(Ar& ar) { + serializer(ar, connect, get, set, clear); + } +}; + +struct GetKVInterface { + constexpr static FileIdentifier file_identifier = 8062308; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, reply); + } +}; + +struct GetRequest { + constexpr static FileIdentifier file_identifier = 6983506; + std::string key; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, key, reply); + } +}; + +struct SetRequest { + constexpr static FileIdentifier file_identifier = 7554186; + std::string key; + std::string value; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, key, value, reply); + } +}; + +struct ClearRequest { + constexpr static FileIdentifier file_identifier = 8500026; + std::string from; + std::string to; + ReplyPromise reply; + + template + void serialize(Ar& ar) { + serializer(ar, from, to, reply); + } +}; + +ACTOR Future kvStoreServer() { + state SimpleKeyValueStoreInteface inf; + state std::map store; + inf.connect.makeWellKnownEndpoint(UID(-1, ++tokenCounter), TaskPriority::DefaultEndpoint); + loop { + choose { + when(GetKVInterface req = waitNext(inf.connect.getFuture())) { + std::cout << "Received connection attempt\n"; + req.reply.send(inf); + } + when(GetRequest req = waitNext(inf.get.getFuture())) { + auto iter = store.find(req.key); + if (iter == store.end()) { + req.reply.sendError(io_error()); + } else { + req.reply.send(iter->second); + } + } + when(SetRequest req = waitNext(inf.set.getFuture())) { + store[req.key] = req.value; + req.reply.send(Void()); + } + when(ClearRequest req = waitNext(inf.clear.getFuture())) { + auto from = store.lower_bound(req.from); + auto to = store.lower_bound(req.to); + while (from != store.end() && from != to) { + auto next = from; + ++next; + store.erase(from); + from = next; + } + req.reply.send(Void()); + } + } + } +} + +ACTOR Future connect() { + std::cout << format("%ull: Connect...\n", uint64_t(g_network->now())); + SimpleKeyValueStoreInteface c; + c.connect = RequestStream(Endpoint({ serverAddress }, UID(-1, ++tokenCounter))); + SimpleKeyValueStoreInteface result = wait(c.connect.getReply(GetKVInterface())); + std::cout << format("%ull: done..\n", uint64_t(g_network->now())); + return result; +} + +ACTOR Future kvSimpleClient() { + state SimpleKeyValueStoreInteface server = wait(connect()); + std::cout << format("Set %s -> %s\n", "foo", "bar"); + SetRequest setRequest; + setRequest.key = "foo"; + setRequest.value = "bar"; + wait(server.set.getReply(setRequest)); + GetRequest getRequest; + getRequest.key = "foo"; + std::string value = wait(server.get.getReply(getRequest)); + std::cout << format("get(%s) -> %s\n", "foo", value.c_str()); + return Void(); +} + +ACTOR Future kvClient(SimpleKeyValueStoreInteface server, std::shared_ptr ops) { + state Future timeout = delay(20); + state int rangeSize = 2 << 12; + loop { + SetRequest setRequest; + setRequest.key = std::to_string(deterministicRandom()->randomInt(0, rangeSize)); + setRequest.value = "foo"; + wait(server.set.getReply(setRequest)); + ++(*ops); + try { + GetRequest getRequest; + getRequest.key = std::to_string(deterministicRandom()->randomInt(0, rangeSize)); + std::string _ = wait(server.get.getReply(getRequest)); + ++(*ops); + } catch (Error& e) { + if (e.code() != error_code_io_error) { + throw e; + } + } + int from = deterministicRandom()->randomInt(0, rangeSize); + ClearRequest clearRequest; + clearRequest.from = std::to_string(from); + clearRequest.to = std::to_string(from + 100); + wait(server.clear.getReply(clearRequest)); + ++(*ops); + if (timeout.isReady()) { + // we are done + return Void(); + } + } +} + +ACTOR Future throughputMeasurement(std::shared_ptr operations) { + loop { + wait(delay(1.0)); + std::cout << format("%ull op/s\n", *operations); + *operations = 0; + } +} + +ACTOR Future multipleClients() { + SimpleKeyValueStoreInteface server = wait(connect()); + auto ops = std::make_shared(0); + std::vector> clients(100); + for (auto& f : clients) { + f = kvClient(server, ops); + } + auto done = waitForAll(clients); + wait(done || throughputMeasurement(ops)); + return Void(); +} + +std::string clusterFile = "fdb.cluster"; + +ACTOR Future fdbClient() { + wait(delay(30)); + state Database db = Database::createDatabase(clusterFile, 300); + state Transaction tx(db); + state std::string keyPrefix = "/tut/"; + state Key startKey; + state KeyRef endKey = LiteralStringRef("/tut0"); + state int beginIdx = 0; + loop { + try { + tx.reset(); + // this workload is stupidly simple: + // 1. select a random key between 1 + // and 1e8 + // 2. select this key plus the 100 + // next ones + // 3. write 10 values in [k, k+100] + beginIdx = deterministicRandom()->randomInt(0, 1e8 - 100); + startKey = keyPrefix + std::to_string(beginIdx); + Standalone range = wait(tx.getRange(KeyRangeRef(startKey, endKey), 100)); + for (int i = 0; i < 10; ++i) { + Key k = Key(keyPrefix + std::to_string(beginIdx + deterministicRandom()->randomInt(0, 100))); + tx.set(k, LiteralStringRef("foo")); + } + wait(tx.commit()); + std::cout << "Committed\n"; + wait(delay(2.0)); + } catch (Error& e) { + wait(tx.onError(e)); + } + } +} + +ACTOR Future fdbStatusStresser() { + state Database db = Database::createDatabase(clusterFile, 300); + state ReadYourWritesTransaction tx(db); + state Key statusJson(std::string("\xff\xff/status/json")); + loop { + try { + tx.reset(); + Optional _ = wait(tx.get(statusJson)); + } catch (Error& e) { + wait(tx.onError(e)); + } + } +} + +std::unordered_map()>> actors = { { "timer", &simpleTimer }, + { "promiseDemo", &promiseDemo }, + { "triggerDemo", &triggerDemo }, + { "echoServer", &echoServer }, + { "echoClient", &echoClient }, + { "kvStoreServer", &kvStoreServer }, + { "kvSimpleClient", &kvSimpleClient }, + { "multipleClients", &multipleClients }, + { "fdbClient", &fdbClient }, + { "fdbStatusStresser", &fdbStatusStresser } }; + +int main(int argc, char* argv[]) { + bool isServer = false; + std::string port; + std::vector()>> toRun; + // parse arguments + for (int i = 1; i < argc; ++i) { + std::string arg(argv[i]); + if (arg == "-p") { + isServer = true; + if (i + 1 >= argc) { + std::cout << "Excpecting an argument after -p\n"; + return 1; + } + port = std::string(argv[++i]); + continue; + } else if (arg == "-s") { + if (i + 1 >= argc) { + std::cout << "Excpecting an argument after -s\n"; + return 1; + } + serverAddress = NetworkAddress::parse(argv[++i]); + continue; + } else if (arg == "-C") { + clusterFile = argv[++i]; + std::cout << "Using cluster file " << clusterFile << std::endl; + continue; + } + auto actor = actors.find(arg); + if (actor == actors.end()) { + std::cout << format("Error: actor %s does not exist\n", arg.c_str()); + return 1; + } + toRun.push_back(actor->second); + } + platformInit(); + g_network = newNet2(false, true); + NetworkAddress publicAddress = NetworkAddress::parse("0.0.0.0:0"); + if (isServer) { + publicAddress = NetworkAddress::parse("0.0.0.0:" + port); + } + // openTraceFile(publicAddress, TRACE_DEFAULT_ROLL_SIZE, + // TRACE_DEFAULT_MAX_LOGS_SIZE); + try { + if (isServer) { + auto listenError = FlowTransport::transport().bind(publicAddress, publicAddress); + if (listenError.isError()) { + listenError.get(); + } + } + } catch (Error& e) { + std::cout << format("Error while binding to address (%d): %s\n", e.code(), e.what()); + } + // now we start the actors + std::vector> all; + for (auto& f : toRun) { + all.emplace_back(f()); + } + auto f = stopAfter(waitForAll(all)); + g_network->run(); + return 0; +}