/* * flow.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "flow/flow.h" #include "flow/DeterministicRandom.h" #include "flow/UnitTest.h" #include "flow/rte_memcpy.h" #include "flow/folly_memcpy.h" #include #include #if (defined(__linux__) || defined(__FreeBSD__)) && defined(__AVX__) && !defined(MEMORY_SANITIZER) // For benchmarking; need a version of rte_memcpy that doesn't live in the same compilation unit as the test. void* rte_memcpy_noinline(void* __restrict __dest, const void* __restrict __src, size_t __n) { return rte_memcpy(__dest, __src, __n); } // This compilation unit will be linked in to the main binary, so this should override glibc memcpy __attribute__((visibility("default"))) void* memcpy(void* __restrict __dest, const void* __restrict __src, size_t __n) { // folly_memcpy is faster for small copies, but rte seems to win out in most other circumstances return rte_memcpy(__dest, __src, __n); } #else void* rte_memcpy_noinline(void* __restrict __dest, const void* __restrict __src, size_t __n) { return memcpy(__dest, __src, __n); } #endif // (defined (__linux__) || defined (__FreeBSD__)) && defined(__AVX__) && !defined(MEMORY_SANITIZER) INetwork* g_network = 0; FILE* randLog = 0; thread_local Reference seededRandom; Reference seededDebugRandom; uint64_t debug_lastLoadBalanceResultEndpointToken = 0; bool noUnseed = false; void setThreadLocalDeterministicRandomSeed(uint32_t seed) { seededRandom = Reference(new DeterministicRandom(seed, true)); seededDebugRandom = Reference(new DeterministicRandom(seed)); } Reference debugRandom() { return seededDebugRandom; } Reference deterministicRandom() { if (!seededRandom) { seededRandom = Reference(new DeterministicRandom(platform::getRandomSeed(), true)); } return seededRandom; } Reference nondeterministicRandom() { static thread_local Reference random; if (!random) { random = Reference(new DeterministicRandom(platform::getRandomSeed())); } return random; } std::string UID::toString() const { return format("%016llx%016llx", part[0], part[1]); } UID UID::fromString(std::string const& s) { ASSERT(s.size() == 32); uint64_t a = 0, b = 0; int r = sscanf(s.c_str(), "%16" SCNx64 "%16" SCNx64, &a, &b); ASSERT(r == 2); return UID(a, b); } std::string UID::shortString() const { return format("%016llx", part[0]); } void detectFailureAfter(int const& address, double const& delay); Optional parse_with_suffix(std::string const& toparse, std::string const& default_unit) { char* endptr; uint64_t ret = strtoull(toparse.c_str(), &endptr, 10); if (endptr == toparse.c_str()) { return Optional(); } std::string unit; if (*endptr == '\0') { if (!default_unit.empty()) { unit = default_unit; } else { return Optional(); } } else { unit = endptr; } if (!unit.compare("B")) { // Nothing to do } else if (!unit.compare("KB")) { ret *= int64_t(1e3); } else if (!unit.compare("KiB")) { ret *= 1LL << 10; } else if (!unit.compare("MB")) { ret *= int64_t(1e6); } else if (!unit.compare("MiB")) { ret *= 1LL << 20; } else if (!unit.compare("GB")) { ret *= int64_t(1e9); } else if (!unit.compare("GiB")) { ret *= 1LL << 30; } else if (!unit.compare("TB")) { ret *= int64_t(1e12); } else if (!unit.compare("TiB")) { ret *= 1LL << 40; } else { return Optional(); } return ret; } // Parses a duration with one of the following suffixes and returns the duration in seconds // s - seconds // m - minutes // h - hours // d - days Optional parseDuration(std::string const& str, std::string const& defaultUnit) { char* endptr; uint64_t ret = strtoull(str.c_str(), &endptr, 10); if (endptr == str.c_str()) { return Optional(); } std::string unit; if (*endptr == '\0') { if (!defaultUnit.empty()) { unit = defaultUnit; } else { return Optional(); } } else { unit = endptr; } if (!unit.compare("s")) { // Nothing to do } else if (!unit.compare("m")) { ret *= 60; } else if (!unit.compare("h")) { ret *= 60 * 60; } else if (!unit.compare("d")) { ret *= 24 * 60 * 60; } else { return Optional(); } return ret; } int vsformat(std::string& outputString, const char* form, va_list args) { char buf[200]; va_list args2; va_copy(args2, args); int size = vsnprintf(buf, sizeof(buf), form, args2); va_end(args2); if (size >= 0 && size < sizeof(buf)) { outputString = std::string(buf, size); return size; } #ifdef _WIN32 // Microsoft's non-standard vsnprintf doesn't return a correct size, but just an error, so determine the necessary // size va_copy(args2, args); size = _vscprintf(form, args2); va_end(args2); #endif if (size < 0) { return -1; } TEST(true); // large format result outputString.resize(size + 1); size = vsnprintf(&outputString[0], outputString.size(), form, args); if (size < 0 || size >= outputString.size()) { return -1; } outputString.resize(size); return size; } std::string format(const char* form, ...) { va_list args; va_start(args, form); std::string str; int result = vsformat(str, form, args); va_end(args); ASSERT(result >= 0); return str; } Standalone strinc(StringRef const& str) { int index; for (index = str.size() - 1; index >= 0; index--) if (str[index] != 255) break; // Must not be called with a string that consists only of zero or more '\xff' bytes. ASSERT(index >= 0); Standalone r = str.substr(0, index + 1); uint8_t* p = mutateString(r); p[r.size() - 1]++; return r; } StringRef strinc(StringRef const& str, Arena& arena) { int index; for (index = str.size() - 1; index >= 0; index--) if (str[index] != 255) break; // Must not be called with a string that consists only of zero or more '\xff' bytes. ASSERT(index >= 0); StringRef r(arena, str.substr(0, index + 1)); uint8_t* p = mutateString(r); p[r.size() - 1]++; return r; } StringRef addVersionStampAtEnd(StringRef const& str, Arena& arena) { int32_t size = str.size(); uint8_t* s = new (arena) uint8_t[size + 14]; memcpy(s, str.begin(), size); memset(&s[size], 0, 10); memcpy(&s[size + 10], &size, 4); return StringRef(s, size + 14); } Standalone addVersionStampAtEnd(StringRef const& str) { Standalone r; ((StringRef&)r) = addVersionStampAtEnd(str, r.arena()); return r; } namespace { std::vector buggifyActivated{ false, false }; std::map, int>> typedSBVars; } // namespace std::vector P_BUGGIFIED_SECTION_ACTIVATED{ .25, .25 }; std::vector P_BUGGIFIED_SECTION_FIRES{ .25, .25 }; double P_EXPENSIVE_VALIDATION = .05; int getSBVar(std::string const& file, int line, BuggifyType type) { if (!buggifyActivated[int(type)]) return 0; const auto& flPair = std::make_pair(file, line); auto& SBVars = typedSBVars[type]; if (!SBVars.count(flPair)) { SBVars[flPair] = deterministicRandom()->random01() < P_BUGGIFIED_SECTION_ACTIVATED[int(type)]; g_traceBatch.addBuggify(SBVars[flPair], line, file); if (g_network) g_traceBatch.dump(); } return SBVars[flPair]; } void clearBuggifySections(BuggifyType type) { typedSBVars[type].clear(); } bool validationIsEnabled(BuggifyType type) { return buggifyActivated[int(type)]; } bool isBuggifyEnabled(BuggifyType type) { return buggifyActivated[int(type)]; } void enableBuggify(bool enabled, BuggifyType type) { buggifyActivated[int(type)] = enabled; } namespace { // Simple message for flatbuffers unittests struct Int { constexpr static FileIdentifier file_identifier = 12345; uint32_t value; Int() = default; Int(uint32_t value) : value(value) {} template void serialize(Ar& ar) { serializer(ar, value); } }; } // namespace TEST_CASE("/flow/FlatBuffers/ErrorOr") { { ErrorOr in(worker_removed()); ErrorOr out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(out.isError()); ASSERT(out.getError().code() == in.getError().code()); } { ErrorOr in(deterministicRandom()->randomUInt32()); ErrorOr out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(!out.isError()); ASSERT(out.get().value == in.get().value); } return Void(); } TEST_CASE("/flow/FlatBuffers/Optional") { { Optional in; Optional out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(!out.present()); } { Optional in(deterministicRandom()->randomUInt32()); Optional out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(out.present()); ASSERT(out.get().value == in.get().value); } return Void(); } TEST_CASE("/flow/FlatBuffers/Standalone") { { Standalone in(std::string("foobar")); StringRef out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(in == out); } { StringRef in = LiteralStringRef("foobar"); Standalone out; ObjectWriter writer(Unversioned()); writer.serialize(in); Standalone copy = writer.toStringRef(); ArenaObjectReader reader(copy.arena(), copy, Unversioned()); reader.deserialize(out); ASSERT(in == out); } return Void(); }