/* * ManagementAPI.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include "flow/Arena.h" #include "fdbclient/FDBOptions.g.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/ReadYourWrites.h" #include "fdbclient/ManagementAPI.actor.h" #include "fdbclient/SystemData.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/CoordinationInterface.h" #include "fdbclient/DatabaseContext.h" #include "fdbrpc/simulator.h" #include "fdbclient/StatusClient.h" #include "flow/UnitTest.h" #include "fdbrpc/ReplicationPolicy.h" #include "fdbrpc/Replication.h" #include "flow/actorcompiler.h" // This must be the last #include. bool isInteger(const std::string& s) { if( s.empty() ) return false; char *p; strtol(s.c_str(), &p, 10); return (*p == 0); } // Defines the mapping between configuration names (as exposed by fdbcli, buildConfiguration()) and actual configuration parameters std::map configForToken( std::string const& mode ) { std::map out; std::string p = configKeysPrefix.toString(); if (mode == "new") { out[p+"initialized"]="1"; return out; } if (mode == "locked") { // Setting this key is interpreted as an instruction to use the normal version-stamp-based mechanism for locking // the database. out[databaseLockedKey.toString()] = deterministicRandom()->randomUniqueID().toString(); return out; } size_t pos; // key:=value is unvalidated and unchecked pos = mode.find( ":=" ); if( pos != std::string::npos ) { out[p+mode.substr(0, pos)] = mode.substr(pos+2); return out; } // key=value is constrained to a limited set of options and basic validation is performed pos = mode.find( "=" ); if( pos != std::string::npos ) { std::string key = mode.substr(0, pos); std::string value = mode.substr(pos+1); if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "log_routers" || key == "usable_regions" || key == "repopulate_anti_quorum") && isInteger(value) ) { out[p+key] = value; } if( key == "regions" ) { json_spirit::mValue mv; json_spirit::read_string( value, mv ); StatusObject regionObj; regionObj["regions"] = mv; out[p+key] = BinaryWriter::toValue(regionObj, IncludeVersion(ProtocolVersion::withRegionConfiguration())).toString(); } return out; } Optional logType; Optional storeType; if (mode == "ssd-1") { logType = KeyValueStoreType::SSD_BTREE_V1; storeType = KeyValueStoreType::SSD_BTREE_V1; } else if (mode == "ssd" || mode == "ssd-2") { logType = KeyValueStoreType::SSD_BTREE_V2; storeType = KeyValueStoreType::SSD_BTREE_V2; } else if (mode == "ssd-redwood-experimental") { logType = KeyValueStoreType::SSD_BTREE_V2; storeType = KeyValueStoreType::SSD_REDWOOD_V1; } else if (mode == "ssd-rocksdb-experimental") { logType = KeyValueStoreType::SSD_BTREE_V2; storeType = KeyValueStoreType::SSD_ROCKSDB_V1; } else if (mode == "memory" || mode == "memory-2") { logType = KeyValueStoreType::SSD_BTREE_V2; storeType= KeyValueStoreType::MEMORY; } else if (mode == "memory-1") { logType = KeyValueStoreType::MEMORY; storeType= KeyValueStoreType::MEMORY; } else if (mode == "memory-radixtree-beta") { logType = KeyValueStoreType::SSD_BTREE_V2; storeType= KeyValueStoreType::MEMORY_RADIXTREE; } // Add any new store types to fdbserver/workloads/ConfigureDatabase, too if (storeType.present()) { out[p+"log_engine"] = format("%d", logType.get().storeType()); out[p+"storage_engine"] = format("%d", KeyValueStoreType::StoreType(storeType.get())); return out; } std::string redundancy, log_replicas; Reference storagePolicy; Reference tLogPolicy; bool redundancySpecified = true; if (mode == "single") { redundancy="1"; log_replicas="1"; storagePolicy = tLogPolicy = Reference(new PolicyOne()); } else if(mode == "double" || mode == "fast_recovery_double") { redundancy="2"; log_replicas="2"; storagePolicy = tLogPolicy = Reference(new PolicyAcross(2, "zoneid", Reference(new PolicyOne()))); } else if(mode == "triple" || mode == "fast_recovery_triple") { redundancy="3"; log_replicas="3"; storagePolicy = tLogPolicy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); } else if(mode == "three_datacenter" || mode == "multi_dc") { redundancy="6"; log_replicas="4"; storagePolicy = Reference(new PolicyAcross(3, "dcid", Reference(new PolicyAcross(2, "zoneid", Reference(new PolicyOne()))) )); tLogPolicy = Reference(new PolicyAcross(2, "dcid", Reference(new PolicyAcross(2, "zoneid", Reference(new PolicyOne()))) )); } else if(mode == "three_datacenter_fallback") { redundancy="4"; log_replicas="4"; storagePolicy = tLogPolicy = Reference(new PolicyAcross(2, "dcid", Reference(new PolicyAcross(2, "zoneid", Reference(new PolicyOne()))))); } else if(mode == "three_data_hall") { redundancy="3"; log_replicas="4"; storagePolicy = Reference(new PolicyAcross(3, "data_hall", Reference(new PolicyOne()))); tLogPolicy = Reference(new PolicyAcross(2, "data_hall", Reference(new PolicyAcross(2, "zoneid", Reference(new PolicyOne()))) )); } else if(mode == "three_data_hall_fallback") { redundancy="2"; log_replicas="4"; storagePolicy = Reference(new PolicyAcross(2, "data_hall", Reference(new PolicyOne()))); tLogPolicy = Reference(new PolicyAcross(2, "data_hall", Reference(new PolicyAcross(2, "zoneid", Reference(new PolicyOne()))) )); } else redundancySpecified = false; if (redundancySpecified) { out[p+"storage_replicas"] = redundancy; out[p+"log_replicas"] = log_replicas; out[p+"log_anti_quorum"] = "0"; BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, storagePolicy); out[p+"storage_replication_policy"] = policyWriter.toValue().toString(); policyWriter = BinaryWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, tLogPolicy); out[p+"log_replication_policy"] = policyWriter.toValue().toString(); return out; } std::string remote_redundancy, remote_log_replicas; Reference remoteTLogPolicy; bool remoteRedundancySpecified = true; if (mode == "remote_default") { remote_redundancy="0"; remote_log_replicas="0"; remoteTLogPolicy = Reference(); } else if (mode == "remote_single") { remote_redundancy="1"; remote_log_replicas="1"; remoteTLogPolicy = Reference(new PolicyOne()); } else if(mode == "remote_double") { remote_redundancy="2"; remote_log_replicas="2"; remoteTLogPolicy = Reference(new PolicyAcross(2, "zoneid", Reference(new PolicyOne()))); } else if(mode == "remote_triple") { remote_redundancy="3"; remote_log_replicas="3"; remoteTLogPolicy = Reference(new PolicyAcross(3, "zoneid", Reference(new PolicyOne()))); } else if(mode == "remote_three_data_hall") { //FIXME: not tested in simulation remote_redundancy="3"; remote_log_replicas="4"; remoteTLogPolicy = Reference(new PolicyAcross(2, "data_hall", Reference(new PolicyAcross(2, "zoneid", Reference(new PolicyOne()))) )); } else remoteRedundancySpecified = false; if (remoteRedundancySpecified) { out[p+"remote_log_replicas"] = remote_log_replicas; BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, remoteTLogPolicy); out[p+"remote_log_policy"] = policyWriter.toValue().toString(); return out; } return out; } ConfigurationResult::Type buildConfiguration( std::vector const& modeTokens, std::map& outConf ) { for(auto it : modeTokens) { std::string mode = it.toString(); auto m = configForToken( mode ); if( !m.size() ) { TraceEvent(SevWarnAlways, "UnknownOption").detail("Option", mode); return ConfigurationResult::UNKNOWN_OPTION; } for( auto t = m.begin(); t != m.end(); ++t ) { if( outConf.count( t->first ) ) { TraceEvent(SevWarnAlways, "ConflictingOption").detail("Option", t->first); return ConfigurationResult::CONFLICTING_OPTIONS; } outConf[t->first] = t->second; } } auto p = configKeysPrefix.toString(); if(!outConf.count(p + "storage_replication_policy") && outConf.count(p + "storage_replicas")) { int storageCount = stoi(outConf[p + "storage_replicas"]); Reference storagePolicy = Reference(new PolicyAcross(storageCount, "zoneid", Reference(new PolicyOne()))); BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, storagePolicy); outConf[p+"storage_replication_policy"] = policyWriter.toValue().toString(); } if(!outConf.count(p + "log_replication_policy") && outConf.count(p + "log_replicas")) { int logCount = stoi(outConf[p + "log_replicas"]); Reference logPolicy = Reference(new PolicyAcross(logCount, "zoneid", Reference(new PolicyOne()))); BinaryWriter policyWriter(IncludeVersion(ProtocolVersion::withReplicationPolicy())); serializeReplicationPolicy(policyWriter, logPolicy); outConf[p+"log_replication_policy"] = policyWriter.toValue().toString(); } return ConfigurationResult::SUCCESS; } ConfigurationResult::Type buildConfiguration( std::string const& configMode, std::map& outConf ) { std::vector modes; int p = 0; while ( p < configMode.size() ) { int end = configMode.find_first_of(' ', p); if (end == configMode.npos) end = configMode.size(); modes.push_back( StringRef(configMode).substr(p, end-p) ); p = end+1; } return buildConfiguration( modes, outConf ); } bool isCompleteConfiguration( std::map const& options ) { std::string p = configKeysPrefix.toString(); return options.count( p+"log_replicas" ) == 1 && options.count( p+"log_anti_quorum" ) == 1 && options.count( p+"storage_replicas" ) == 1 && options.count( p+"log_engine" ) == 1 && options.count( p+"storage_engine" ) == 1; } ACTOR Future getDatabaseConfiguration( Database cx ) { state Transaction tr(cx); loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); Standalone res = wait( tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY) ); ASSERT( res.size() < CLIENT_KNOBS->TOO_MANY ); DatabaseConfiguration config; config.fromKeyValues((VectorRef) res); return config; } catch( Error &e ) { wait( tr.onError(e) ); } } } ACTOR Future changeConfig( Database cx, std::map m, bool force ) { state StringRef initIdKey = LiteralStringRef( "\xff/init_id" ); state Transaction tr(cx); if (!m.size()) { return ConfigurationResult::NO_OPTIONS_PROVIDED; } // make sure we have essential configuration options std::string initKey = configKeysPrefix.toString() + "initialized"; state bool creating = m.count( initKey ) != 0; state Optional locked; { auto iter = m.find(databaseLockedKey.toString()); if (iter != m.end()) { if (!creating) { return ConfigurationResult::LOCKED_NOT_NEW; } locked = UID::fromString(iter->second); m.erase(iter); } } if (creating) { m[initIdKey.toString()] = deterministicRandom()->randomUniqueID().toString(); if (!isCompleteConfiguration(m)) { return ConfigurationResult::INCOMPLETE_CONFIGURATION; } } state Future tooLong = delay(60); state Key versionKey = BinaryWriter::toValue(deterministicRandom()->randomUniqueID(),Unversioned()); state bool oldReplicationUsesDcId = false; loop { try { tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES ); if(!creating && !force) { state Future> fConfig = tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY); state Future> fWorkers = getWorkers(&tr); wait( success(fConfig) || tooLong ); if(!fConfig.isReady()) { return ConfigurationResult::DATABASE_UNAVAILABLE; } if(fConfig.isReady()) { ASSERT( fConfig.get().size() < CLIENT_KNOBS->TOO_MANY ); state DatabaseConfiguration oldConfig; oldConfig.fromKeyValues((VectorRef) fConfig.get()); state DatabaseConfiguration newConfig = oldConfig; for(auto kv : m) { newConfig.set(kv.first, kv.second); } if(!newConfig.isValid()) { return ConfigurationResult::INVALID_CONFIGURATION; } if(newConfig.tLogPolicy->attributeKeys().count("dcid") && newConfig.regions.size()>0) { return ConfigurationResult::REGION_REPLICATION_MISMATCH; } oldReplicationUsesDcId = oldReplicationUsesDcId || oldConfig.tLogPolicy->attributeKeys().count("dcid"); if(oldConfig.usableRegions != newConfig.usableRegions) { //cannot change region configuration std::map dcId_priority; for(auto& it : newConfig.regions) { dcId_priority[it.dcId] = it.priority; } for(auto& it : oldConfig.regions) { if(!dcId_priority.count(it.dcId) || dcId_priority[it.dcId] != it.priority) { return ConfigurationResult::REGIONS_CHANGED; } } //must only have one region with priority >= 0 int activeRegionCount = 0; for(auto& it : newConfig.regions) { if(it.priority >= 0) { activeRegionCount++; } } if(activeRegionCount > 1) { return ConfigurationResult::MULTIPLE_ACTIVE_REGIONS; } } state Future> fServerList = (newConfig.regions.size()) ? tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) : Future>(); if(newConfig.usableRegions==2) { if(oldReplicationUsesDcId) { state Future> fLocalityList = tr.getRange( tagLocalityListKeys, CLIENT_KNOBS->TOO_MANY ); wait( success(fLocalityList) || tooLong ); if(!fLocalityList.isReady()) { return ConfigurationResult::DATABASE_UNAVAILABLE; } Standalone localityList = fLocalityList.get(); ASSERT( !localityList.more && localityList.size() < CLIENT_KNOBS->TOO_MANY ); std::set localityDcIds; for(auto& s : localityList) { auto dc = decodeTagLocalityListKey( s.key ); if(dc.present()) { localityDcIds.insert(dc.get()); } } for(auto& it : newConfig.regions) { if(localityDcIds.count(it.dcId) == 0) { return ConfigurationResult::DCID_MISSING; } } } else { //all regions with priority >= 0 must be fully replicated state std::vector>> replicasFutures; for(auto& it : newConfig.regions) { if(it.priority >= 0) { replicasFutures.push_back(tr.get(datacenterReplicasKeyFor(it.dcId))); } } wait( waitForAll(replicasFutures) || tooLong ); for(auto& it : replicasFutures) { if(!it.isReady()) { return ConfigurationResult::DATABASE_UNAVAILABLE; } if(!it.get().present()) { return ConfigurationResult::REGION_NOT_FULLY_REPLICATED; } } } } if(newConfig.regions.size()) { //all storage servers must be in one of the regions wait( success(fServerList) || tooLong ); if(!fServerList.isReady()) { return ConfigurationResult::DATABASE_UNAVAILABLE; } Standalone serverList = fServerList.get(); ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY ); std::set newDcIds; for(auto& it : newConfig.regions) { newDcIds.insert(it.dcId); } std::set> missingDcIds; for(auto& s : serverList) { auto ssi = decodeServerListValue( s.value ); if ( !ssi.locality.dcId().present() || !newDcIds.count(ssi.locality.dcId().get()) ) { missingDcIds.insert(ssi.locality.dcId()); } } if(missingDcIds.size() > (oldReplicationUsesDcId ? 1 : 0)) { return ConfigurationResult::STORAGE_IN_UNKNOWN_DCID; } } wait( success(fWorkers) || tooLong ); if(!fWorkers.isReady()) { return ConfigurationResult::DATABASE_UNAVAILABLE; } if(newConfig.regions.size()) { std::map, std::set>> dcId_zoneIds; for(auto& it : fWorkers.get()) { if( it.processClass.machineClassFitness(ProcessClass::Storage) <= ProcessClass::WorstFit ) { dcId_zoneIds[it.locality.dcId()].insert(it.locality.zoneId()); } } for(auto& region : newConfig.regions) { if(dcId_zoneIds[region.dcId].size() < std::max(newConfig.storageTeamSize, newConfig.tLogReplicationFactor)) { return ConfigurationResult::NOT_ENOUGH_WORKERS; } if(region.satelliteTLogReplicationFactor > 0 && region.priority >= 0) { int totalSatelliteProcesses = 0; for(auto& sat : region.satellites) { totalSatelliteProcesses += dcId_zoneIds[sat.dcId].size(); } if(totalSatelliteProcesses < region.satelliteTLogReplicationFactor) { return ConfigurationResult::NOT_ENOUGH_WORKERS; } } } } else { std::set> zoneIds; for(auto& it : fWorkers.get()) { if( it.processClass.machineClassFitness(ProcessClass::Storage) <= ProcessClass::WorstFit ) { zoneIds.insert(it.locality.zoneId()); } } if(zoneIds.size() < std::max(newConfig.storageTeamSize, newConfig.tLogReplicationFactor)) { return ConfigurationResult::NOT_ENOUGH_WORKERS; } } } } if (creating) { tr.setOption( FDBTransactionOptions::INITIALIZE_NEW_DATABASE ); tr.addReadConflictRange( singleKeyRange( initIdKey ) ); } else if (m.size()) { // might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY ); tr.addReadConflictRange( singleKeyRange(m.begin()->first) ); } if (locked.present()) { ASSERT(creating); tr.atomicOp(databaseLockedKey, BinaryWriter::toValue(locked.get(), Unversioned()) .withPrefix(LiteralStringRef("0123456789")) .withSuffix(LiteralStringRef("\x00\x00\x00\x00")), MutationRef::SetVersionstampedValue); } for (auto i = m.begin(); i != m.end(); ++i) { tr.set( StringRef(i->first), StringRef(i->second) ); } tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) ); tr.set( moveKeysLockOwnerKey, versionKey ); wait( tr.commit() ); break; } catch (Error& e) { state Error e1(e); if ( (e.code() == error_code_not_committed || e.code() == error_code_transaction_too_old ) && creating) { // The database now exists. Determine whether we created it or it was already existing/created by someone else. The latter is an error. tr.reset(); loop { try { tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES ); Optional v = wait( tr.get( initIdKey ) ); if (v != m[initIdKey.toString()]) return ConfigurationResult::DATABASE_ALREADY_CREATED; else return ConfigurationResult::DATABASE_CREATED; } catch (Error& e2) { wait( tr.onError(e2) ); } } } wait( tr.onError(e1) ); } } return ConfigurationResult::SUCCESS; } ConfigureAutoResult parseConfig( StatusObject const& status ) { ConfigureAutoResult result; StatusObjectReader statusObj(status); StatusObjectReader statusObjCluster; if (!statusObj.get("cluster", statusObjCluster)) return ConfigureAutoResult(); StatusObjectReader statusObjConfig; if (!statusObjCluster.get("configuration", statusObjConfig)) return ConfigureAutoResult(); if (!statusObjConfig.get("redundancy.factor", result.old_replication)) return ConfigureAutoResult(); result.auto_replication = result.old_replication; int storage_replication; int log_replication; if( result.old_replication == "single" ) { result.auto_replication = "double"; storage_replication = 2; log_replication = 2; } else if( result.old_replication == "double" || result.old_replication == "fast_recovery_double" ) { storage_replication = 2; log_replication = 2; } else if( result.old_replication == "triple" || result.old_replication == "fast_recovery_triple" ) { storage_replication = 3; log_replication = 3; } else if( result.old_replication == "three_datacenter" ) { storage_replication = 6; log_replication = 4; } else if( result.old_replication == "three_datacenter_fallback" ) { storage_replication = 4; log_replication = 4; } else if( result.old_replication == "three_data_hall" ) { storage_replication = 3; log_replication = 4; } else if( result.old_replication == "three_data_hall_fallback" ) { storage_replication = 2; log_replication = 4; } else return ConfigureAutoResult(); StatusObjectReader machinesMap; if (!statusObjCluster.get("machines", machinesMap)) return ConfigureAutoResult(); std::map machineid_dcid; std::set datacenters; int machineCount = 0; for (auto mach : machinesMap.obj()) { StatusObjectReader machine(mach.second); std::string dcId; if (machine.get("datacenter_id", dcId)) { machineid_dcid[mach.first] = dcId; datacenters.insert(dcId); } machineCount++; } result.machines = machineCount; if(datacenters.size() > 1) return ConfigureAutoResult(); StatusObjectReader processesMap; if (!statusObjCluster.get("processes", processesMap)) return ConfigureAutoResult(); std::set oldMachinesWithTransaction; int oldTransactionProcesses = 0; std::map>> machine_processes; int processCount = 0; for (auto proc : processesMap.obj()){ StatusObjectReader process(proc.second); if (!process.has("excluded") || !process.last().get_bool()) { std::string addrStr; if (!process.get("address", addrStr)) return ConfigureAutoResult(); std::string class_source; if (!process.get("class_source", class_source)) return ConfigureAutoResult(); std::string class_type; if (!process.get("class_type", class_type)) return ConfigureAutoResult(); std::string machineId; if (!process.get("machine_id", machineId)) return ConfigureAutoResult(); NetworkAddress addr = NetworkAddress::parse(addrStr); ProcessClass processClass(class_type, class_source); if(processClass.classType() == ProcessClass::TransactionClass || processClass.classType() == ProcessClass::LogClass) { oldMachinesWithTransaction.insert(machineId); } if(processClass.classType() == ProcessClass::TransactionClass || processClass.classType() == ProcessClass::ProxyClass || processClass.classType() == ProcessClass::ResolutionClass || processClass.classType() == ProcessClass::StatelessClass || processClass.classType() == ProcessClass::LogClass) { oldTransactionProcesses++; } if( processClass.classSource() == ProcessClass::AutoSource ) { processClass = ProcessClass(ProcessClass::UnsetClass, ProcessClass::CommandLineSource); result.address_class[addr] = processClass; } if( processClass.classType() != ProcessClass::TesterClass ) { machine_processes[machineId].push_back(std::make_pair(addr, processClass)); processCount++; } } } result.processes = processCount; result.old_processes_with_transaction = oldTransactionProcesses; result.old_machines_with_transaction = oldMachinesWithTransaction.size(); std::map, std::vector>> count_processes; for( auto& it : machine_processes ) { count_processes[std::make_pair(it.second.size(), it.first)] = it.second; } std::set machinesWithTransaction; std::set machinesWithStorage; int totalTransactionProcesses = 0; int existingProxyCount = 0; int existingResolverCount = 0; int existingStatelessCount = 0; for( auto& it : machine_processes ) { for(auto& proc : it.second ) { if(proc.second == ProcessClass::TransactionClass || proc.second == ProcessClass::LogClass) { totalTransactionProcesses++; machinesWithTransaction.insert(it.first); } if(proc.second == ProcessClass::StatelessClass) { existingStatelessCount++; } if(proc.second == ProcessClass::ProxyClass) { existingProxyCount++; } if(proc.second == ProcessClass::ResolutionClass) { existingResolverCount++; } if(proc.second == ProcessClass::StorageClass) { machinesWithStorage.insert(it.first); } if(proc.second == ProcessClass::UnsetClass && proc.second.classSource() == ProcessClass::DBSource) { machinesWithStorage.insert(it.first); } } } if( processCount < 10 ) return ConfigureAutoResult(); result.desired_resolvers = 1; int resolverCount; if (!statusObjConfig.get("resolvers", result.old_resolvers)) { result.old_resolvers = CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS; statusObjConfig.get("auto_resolvers", result.old_resolvers); result.auto_resolvers = result.desired_resolvers; resolverCount = result.auto_resolvers; } else { result.auto_resolvers = result.old_resolvers; resolverCount = result.old_resolvers; } result.desired_proxies = std::min( 12, processCount / 15 ); int proxyCount; if (!statusObjConfig.get("proxies", result.old_proxies)) { result.old_proxies = CLIENT_KNOBS->DEFAULT_AUTO_PROXIES; statusObjConfig.get("auto_proxies", result.old_proxies); result.auto_proxies = result.desired_proxies; proxyCount = result.auto_proxies; } else { result.auto_proxies = result.old_proxies; proxyCount = result.old_proxies; } result.desired_logs = std::min( 12, processCount / 20 ); result.desired_logs = std::max( result.desired_logs, log_replication + 1 ); result.desired_logs = std::min( result.desired_logs, machine_processes.size() ); int logCount; if (!statusObjConfig.get("logs", result.old_logs)) { result.old_logs = CLIENT_KNOBS->DEFAULT_AUTO_LOGS; statusObjConfig.get("auto_logs", result.old_logs); result.auto_logs = result.desired_logs; logCount = result.auto_logs; } else { result.auto_logs = result.old_logs; logCount = result.old_logs; } logCount = std::max(logCount, log_replication); totalTransactionProcesses += std::min(existingProxyCount, proxyCount); totalTransactionProcesses += std::min(existingResolverCount, resolverCount); totalTransactionProcesses += existingStatelessCount; //if one process on a machine is transaction class, make them all transaction class for( auto& it : count_processes ) { if( machinesWithTransaction.count(it.first.second) && !machinesWithStorage.count(it.first.second) ) { for(auto& proc : it.second ) { if(proc.second == ProcessClass::UnsetClass && proc.second.classSource() == ProcessClass::CommandLineSource) { result.address_class[proc.first] = ProcessClass(ProcessClass::TransactionClass, ProcessClass::AutoSource); totalTransactionProcesses++; } } } } int desiredTotalTransactionProcesses = logCount + resolverCount + proxyCount; //add machines with all transaction class until we have enough processes and enough machines for( auto& it : count_processes ) { if( machinesWithTransaction.size() >= logCount && totalTransactionProcesses >= desiredTotalTransactionProcesses ) break; if( !machinesWithTransaction.count(it.first.second) && !machinesWithStorage.count(it.first.second) ) { for(auto& proc : it.second ) { if(proc.second == ProcessClass::UnsetClass && proc.second.classSource() == ProcessClass::CommandLineSource) { ASSERT(proc.second != ProcessClass::TransactionClass); result.address_class[proc.first] = ProcessClass(ProcessClass::TransactionClass, ProcessClass::AutoSource); totalTransactionProcesses++; machinesWithTransaction.insert(it.first.second); } } } } if( machinesWithTransaction.size() < logCount || totalTransactionProcesses < desiredTotalTransactionProcesses ) return ConfigureAutoResult(); result.auto_processes_with_transaction = totalTransactionProcesses; result.auto_machines_with_transaction = machinesWithTransaction.size(); if( 3*totalTransactionProcesses > processCount ) return ConfigureAutoResult(); return result; } ACTOR Future autoConfig( Database cx, ConfigureAutoResult conf ) { state Transaction tr(cx); state Key versionKey = BinaryWriter::toValue(deterministicRandom()->randomUniqueID(),Unversioned()); if(!conf.address_class.size()) return ConfigurationResult::INCOMPLETE_CONFIGURATION; //FIXME: correct return type loop { try { tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES ); vector workers = wait( getWorkers(&tr) ); std::map>> address_processId; for(auto& w : workers) { address_processId[w.address] = w.locality.processId(); } for(auto& it : conf.address_class) { if( it.second.classSource() == ProcessClass::CommandLineSource ) { tr.clear(processClassKeyFor(address_processId[it.first].get())); } else { tr.set(processClassKeyFor(address_processId[it.first].get()), processClassValue(it.second)); } } if(conf.address_class.size()) tr.set(processClassChangeKey, deterministicRandom()->randomUniqueID().toString()); if(conf.auto_logs != conf.old_logs) tr.set(configKeysPrefix.toString() + "auto_logs", format("%d", conf.auto_logs)); if(conf.auto_proxies != conf.old_proxies) tr.set(configKeysPrefix.toString() + "auto_proxies", format("%d", conf.auto_proxies)); if(conf.auto_resolvers != conf.old_resolvers) tr.set(configKeysPrefix.toString() + "auto_resolvers", format("%d", conf.auto_resolvers)); if( conf.auto_replication != conf.old_replication ) { std::vector modes; modes.push_back(conf.auto_replication); std::map m; auto r = buildConfiguration( modes, m ); if (r != ConfigurationResult::SUCCESS) return r; for(auto& kv : m) tr.set(kv.first, kv.second); } tr.addReadConflictRange( singleKeyRange(moveKeysLockOwnerKey) ); tr.set( moveKeysLockOwnerKey, versionKey ); wait( tr.commit() ); return ConfigurationResult::SUCCESS; } catch( Error &e ) { wait( tr.onError(e)); } } } Future changeConfig( Database const& cx, std::vector const& modes, Optional const& conf, bool force ) { if( modes.size() && modes[0] == LiteralStringRef("auto") && conf.present() ) { return autoConfig(cx, conf.get()); } std::map m; auto r = buildConfiguration( modes, m ); if (r != ConfigurationResult::SUCCESS) return r; return changeConfig(cx, m, force); } Future changeConfig( Database const& cx, std::string const& modes, bool force ) { TraceEvent("ChangeConfig").detail("Mode", modes); std::map m; auto r = buildConfiguration( modes, m ); if (r != ConfigurationResult::SUCCESS) return r; return changeConfig(cx, m, force); } ACTOR Future> getWorkers( Transaction* tr ) { state Future> processClasses = tr->getRange( processClassKeys, CLIENT_KNOBS->TOO_MANY ); state Future> processData = tr->getRange( workerListKeys, CLIENT_KNOBS->TOO_MANY ); wait( success(processClasses) && success(processData) ); ASSERT( !processClasses.get().more && processClasses.get().size() < CLIENT_KNOBS->TOO_MANY ); ASSERT( !processData.get().more && processData.get().size() < CLIENT_KNOBS->TOO_MANY ); std::map>,ProcessClass> id_class; for( int i = 0; i < processClasses.get().size(); i++ ) { id_class[decodeProcessClassKey(processClasses.get()[i].key)] = decodeProcessClassValue(processClasses.get()[i].value); } std::vector results; for( int i = 0; i < processData.get().size(); i++ ) { ProcessData data = decodeWorkerListValue(processData.get()[i].value); ProcessClass processClass = id_class[data.locality.processId()]; if(processClass.classSource() == ProcessClass::DBSource || data.processClass.classType() == ProcessClass::UnsetClass) data.processClass = processClass; if(data.processClass.classType() != ProcessClass::TesterClass) results.push_back(data); } return results; } ACTOR Future> getWorkers( Database cx ) { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary? tr.setOption( FDBTransactionOptions::LOCK_AWARE ); vector workers = wait( getWorkers(&tr) ); return workers; } catch (Error& e) { wait( tr.onError(e) ); } } } ACTOR Future> getCoordinators( Database cx ) { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::LOCK_AWARE ); Optional currentKey = wait( tr.get( coordinatorsKey ) ); if (!currentKey.present()) return std::vector(); return ClusterConnectionString( currentKey.get().toString() ).coordinators(); } catch (Error& e) { wait( tr.onError(e) ); } } } ACTOR Future changeQuorum( Database cx, Reference change ) { state Transaction tr(cx); state int retries = 0; state std::vector desiredCoordinators; state int notEnoughMachineResults = 0; loop { try { tr.setOption( FDBTransactionOptions::LOCK_AWARE ); tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); Optional currentKey = wait( tr.get( coordinatorsKey ) ); if (!currentKey.present()) return CoordinatorsResult::BAD_DATABASE_STATE; // Someone deleted this key entirely? state ClusterConnectionString old( currentKey.get().toString() ); if ( cx->getConnectionFile() && old.clusterKeyName().toString() != cx->getConnectionFile()->getConnectionString().clusterKeyName() ) return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database?? state CoordinatorsResult::Type result = CoordinatorsResult::SUCCESS; if(!desiredCoordinators.size()) { std::vector _desiredCoordinators = wait( change->getDesiredCoordinators( &tr, old.coordinators(), Reference(new ClusterConnectionFile(old)), result ) ); desiredCoordinators = _desiredCoordinators; } if(result == CoordinatorsResult::NOT_ENOUGH_MACHINES && notEnoughMachineResults < 1) { //we could get not_enough_machines if we happen to see the database while the cluster controller is updating the worker list, so make sure it happens twice before returning a failure notEnoughMachineResults++; wait( delay(1.0) ); tr.reset(); continue; } if (result != CoordinatorsResult::SUCCESS) return result; if (!desiredCoordinators.size()) return CoordinatorsResult::INVALID_NETWORK_ADDRESSES; std::sort(desiredCoordinators.begin(), desiredCoordinators.end()); std::string newName = change->getDesiredClusterKeyName(); if (newName.empty()) newName = old.clusterKeyName().toString(); if ( old.coordinators() == desiredCoordinators && old.clusterKeyName() == newName) return retries ? CoordinatorsResult::SUCCESS : CoordinatorsResult::SAME_NETWORK_ADDRESSES; state ClusterConnectionString conn( desiredCoordinators, StringRef( newName + ':' + deterministicRandom()->randomAlphaNumeric( 32 ) ) ); if(g_network->isSimulated()) { for(int i = 0; i < (desiredCoordinators.size()/2)+1; i++) { auto addresses = g_simulator.getProcessByAddress(desiredCoordinators[i])->addresses; g_simulator.protectedAddresses.insert(addresses.address); if(addresses.secondaryAddress.present()) { g_simulator.protectedAddresses.insert(addresses.secondaryAddress.get()); } TraceEvent("ProtectCoordinator").detail("Address", desiredCoordinators[i]).backtrace(); } } TraceEvent("AttemptingQuorumChange").detail("FromCS", old.toString()).detail("ToCS", conn.toString()); TEST(old.clusterKeyName() != conn.clusterKeyName()); // Quorum change with new name TEST(old.clusterKeyName() == conn.clusterKeyName()); // Quorum change with unchanged name vector>> leaderServers; ClientCoordinators coord( Reference( new ClusterConnectionFile( conn ) ) ); for( int i = 0; i < coord.clientLeaderServers.size(); i++ ) leaderServers.push_back( retryBrokenPromise( coord.clientLeaderServers[i].getLeader, GetLeaderRequest( coord.clusterKey, UID() ), TaskPriority::CoordinationReply ) ); choose { when( wait( waitForAll( leaderServers ) ) ) {} when( wait( delay(5.0) ) ) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; } } tr.set( coordinatorsKey, conn.toString() ); wait( tr.commit() ); ASSERT( false ); //commit should fail, but the value has changed } catch (Error& e) { TraceEvent("RetryQuorumChange").error(e).detail("Retries", retries); wait( tr.onError(e) ); ++retries; } } } struct SpecifiedQuorumChange : IQuorumChange { vector desired; explicit SpecifiedQuorumChange( vector const& desired ) : desired(desired) {} virtual Future> getDesiredCoordinators( Transaction* tr, vector oldCoordinators, Reference, CoordinatorsResult::Type& ) { return desired; } }; Reference specifiedQuorumChange(vector const& addresses) { return Reference(new SpecifiedQuorumChange(addresses)); } struct NoQuorumChange : IQuorumChange { virtual Future> getDesiredCoordinators( Transaction* tr, vector oldCoordinators, Reference, CoordinatorsResult::Type& ) { return oldCoordinators; } }; Reference noQuorumChange() { return Reference(new NoQuorumChange); } struct NameQuorumChange : IQuorumChange { std::string newName; Reference otherChange; explicit NameQuorumChange( std::string const& newName, Reference const& otherChange ) : newName(newName), otherChange(otherChange) {} virtual Future> getDesiredCoordinators( Transaction* tr, vector oldCoordinators, Reference cf, CoordinatorsResult::Type& t ) { return otherChange->getDesiredCoordinators(tr, oldCoordinators, cf, t); } virtual std::string getDesiredClusterKeyName() { return newName; } }; Reference nameQuorumChange(std::string const& name, Reference const& other) { return Reference(new NameQuorumChange( name, other )); } struct AutoQuorumChange : IQuorumChange { int desired; explicit AutoQuorumChange( int desired ) : desired(desired) {} virtual Future> getDesiredCoordinators( Transaction* tr, vector oldCoordinators, Reference ccf, CoordinatorsResult::Type& err ) { return getDesired( this, tr, oldCoordinators, ccf, &err ); } ACTOR static Future getRedundancy( AutoQuorumChange* self, Transaction* tr ) { state Future> fStorageReplicas = tr->get( LiteralStringRef("storage_replicas").withPrefix( configKeysPrefix ) ); state Future> fLogReplicas = tr->get( LiteralStringRef("log_replicas").withPrefix( configKeysPrefix ) ); wait( success( fStorageReplicas ) && success( fLogReplicas ) ); int redundancy = std::min( atoi( fStorageReplicas.get().get().toString().c_str() ), atoi( fLogReplicas.get().get().toString().c_str() ) ); return redundancy; } ACTOR static Future isAcceptable( AutoQuorumChange* self, Transaction* tr, vector oldCoordinators, Reference ccf, int desiredCount, std::set* excluded ) { // Are there enough coordinators for the redundancy level? if (oldCoordinators.size() < desiredCount) return false; if (oldCoordinators.size() % 2 != 1) return false; // Check availability ClientCoordinators coord(ccf); vector>> leaderServers; for( int i = 0; i < coord.clientLeaderServers.size(); i++ ) leaderServers.push_back( retryBrokenPromise( coord.clientLeaderServers[i].getLeader, GetLeaderRequest( coord.clusterKey, UID() ), TaskPriority::CoordinationReply ) ); Optional>> results = wait( timeout( getAll(leaderServers), CLIENT_KNOBS->IS_ACCEPTABLE_DELAY ) ); if (!results.present()) return false; // Not all responded for(auto& r : results.get()) if (!r.present()) return false; // Coordinator doesn't know about this database? // Check exclusions for(auto& c : oldCoordinators) { if (addressExcluded(*excluded, c)) return false; } // Check locality // FIXME: Actual locality! std::sort( oldCoordinators.begin(), oldCoordinators.end() ); for(int i=1; i> getDesired( AutoQuorumChange* self, Transaction* tr, vector oldCoordinators, Reference ccf, CoordinatorsResult::Type* err ) { state int desiredCount = self->desired; if(desiredCount == -1) { int redundancy = wait( getRedundancy( self, tr ) ); desiredCount = redundancy*2 - 1; } std::vector excl = wait( getExcludedServers( tr ) ); state std::set excluded( excl.begin(), excl.end() ); vector _workers = wait(getWorkers(tr)); state vector workers = _workers; std::map addr_locality; for(auto w : workers) addr_locality[w.address] = w.locality; // since we don't have the locality data for oldCoordinators: // check if every old coordinator is in the workers vector and // check if multiple old coordinators map to the same locality data (same machine) bool checkAcceptable = true; std::set>> checkDuplicates; for (auto addr : oldCoordinators) { auto findResult = addr_locality.find(addr); if (findResult == addr_locality.end() || checkDuplicates.count(findResult->second.zoneId())){ checkAcceptable = false; break; } checkDuplicates.insert(findResult->second.zoneId()); } if (checkAcceptable){ bool ok = wait(isAcceptable(self, tr, oldCoordinators, ccf, desiredCount, &excluded)); if (ok) return oldCoordinators; } std::vector chosen; self->addDesiredWorkers(chosen, workers, desiredCount, excluded); if (chosen.size() < desiredCount) { if (chosen.size() < oldCoordinators.size()) { TraceEvent("NotEnoughMachinesForCoordinators").detail("EligibleWorkers", workers.size()).detail("DesiredCoordinators", desiredCount).detail("CurrentCoordinators", oldCoordinators.size()); *err = CoordinatorsResult::NOT_ENOUGH_MACHINES; return vector(); } chosen.resize((chosen.size() - 1) | 1); } return chosen; } // Select a desired set of workers such that // (1) the number of workers at each locality type (e.g., dcid) <= desiredCount; and // (2) prefer workers at a locality where less workers has been chosen than other localities: evenly distribute workers. void addDesiredWorkers(vector& chosen, const vector& workers, int desiredCount, const std::set& excluded) { vector remainingWorkers(workers); deterministicRandom()->randomShuffle(remainingWorkers); std::partition(remainingWorkers.begin(), remainingWorkers.end(), [](const ProcessData& data) { return (data.processClass == ProcessClass::CoordinatorClass); }); TraceEvent(SevDebug, "AutoSelectCoordinators").detail("CandidateWorkers", remainingWorkers.size()); for (auto worker = remainingWorkers.begin(); worker != remainingWorkers.end(); worker++) { TraceEvent(SevDebug, "AutoSelectCoordinators") .detail("Worker", worker->processClass.toString()) .detail("Address", worker->address.toString()) .detail("Locality", worker->locality.toString()); } TraceEvent(SevDebug, "AutoSelectCoordinators").detail("ExcludedAddress", excluded.size()); for (auto& excludedAddr : excluded) { TraceEvent(SevDebug, "AutoSelectCoordinators").detail("ExcludedAddress", excludedAddr.toString()); } std::map maxCounts; std::map> currentCounts; std::map hardLimits; vector fields({ LiteralStringRef("dcid"), LiteralStringRef("data_hall"), LiteralStringRef("zoneid"), LiteralStringRef("machineid") }); for(auto field = fields.begin(); field != fields.end(); field++) { if(field->toString() == "zoneid") { hardLimits[*field] = 1; } else { hardLimits[*field] = desiredCount; } } while(chosen.size() < desiredCount) { bool found = false; for (auto worker = remainingWorkers.begin(); worker != remainingWorkers.end(); worker++) { if(addressExcluded(excluded, worker->address)) { continue; } // Exclude faulty node due to machine assassination if (g_network->isSimulated() && g_simulator.protectedAddresses.count(worker->address) && !g_simulator.getProcessByAddress(worker->address)->isReliable()) { TraceEvent("AutoSelectCoordinators").detail("SkipUnreliableWorker", worker->address.toString()); continue; } bool valid = true; for(auto field = fields.begin(); field != fields.end(); field++) { if(maxCounts[*field] == 0) { maxCounts[*field] = 1; } auto value = worker->locality.get(*field).orDefault(LiteralStringRef("")); auto currentCount = currentCounts[*field][value]; if(currentCount >= maxCounts[*field]) { valid = false; break; } } if(valid) { for(auto field = fields.begin(); field != fields.end(); field++) { auto value = worker->locality.get(*field).orDefault(LiteralStringRef("")); currentCounts[*field][value] += 1; } chosen.push_back(worker->address); remainingWorkers.erase(worker); found = true; break; } } if(!found) { bool canIncrement = false; for(auto field = fields.begin(); field != fields.end(); field++) { if(maxCounts[*field] < hardLimits[*field]) { maxCounts[*field] += 1; canIncrement = true; break; } } if(!canIncrement) { break; } } } } }; Reference autoQuorumChange( int desired ) { return Reference(new AutoQuorumChange(desired)); } void excludeServers(Transaction& tr, vector& servers, bool failed) { tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES ); std::string excludeVersionKey = deterministicRandom()->randomUniqueID().toString(); auto serversVersionKey = failed ? failedServersVersionKey : excludedServersVersionKey; tr.addReadConflictRange( singleKeyRange(serversVersionKey) ); //To conflict with parallel includeServers tr.set( serversVersionKey, excludeVersionKey ); for(auto& s : servers) { if (failed) { tr.set( encodeFailedServersKey(s), StringRef() ); } else { tr.set( encodeExcludedServersKey(s), StringRef() ); } } TraceEvent("ExcludeServersCommit").detail("Servers", describe(servers)).detail("ExcludeFailed", failed); } ACTOR Future excludeServers(Database cx, vector servers, bool failed) { if (cx->apiVersionAtLeast(700)) { state ReadYourWritesTransaction ryw(cx); loop { try{ ryw.setOption( FDBTransactionOptions::SPECIAL_KEY_SPACE_CHANGE_CONFIGURATION); ryw.set(SpecialKeySpace::getManagementApiCommandOptionSpecialKey(failed ? "failed" : "exclude", "force"), ValueRef()); for(auto& s : servers) { Key addr = failed ? SpecialKeySpace::getManagementApiCommandPrefix("failed").withSuffix(s.toString()) : SpecialKeySpace::getManagementApiCommandPrefix("exclude").withSuffix(s.toString()); ryw.set(addr, ValueRef()); } TraceEvent("ExcludeServersSpecialKeySpaceCommit").detail("Servers", describe(servers)).detail("ExcludeFailed", failed); wait(ryw.commit()); return Void(); } catch (Error& e) { wait( ryw.onError(e) ); } } } else { state Transaction tr(cx); loop { try { excludeServers(tr, servers, failed); wait( tr.commit() ); return Void(); } catch (Error& e) { wait( tr.onError(e) ); } } } } ACTOR Future includeServers(Database cx, vector servers, bool failed) { state std::string versionKey = deterministicRandom()->randomUniqueID().toString(); if (cx->apiVersionAtLeast(700)) { state ReadYourWritesTransaction ryw(cx); loop { try { ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_CHANGE_CONFIGURATION); for (auto& s : servers) { if (!s.isValid()) { if (failed) { ryw.clear(failedServersKeys.withPrefix(normalKeys.end)); } else { ryw.clear(excludedServersKeys.withPrefix(normalKeys.end)); } } else { Key addr = failed ? SpecialKeySpace::getManagementApiCommandPrefix("failed").withSuffix(s.toString()) : SpecialKeySpace::getManagementApiCommandPrefix("exclude").withSuffix(s.toString()); ryw.clear(addr); // Eliminate both any ip-level exclusion (1.2.3.4) and any // port-level exclusions (1.2.3.4:5) // The range ['IP', 'IP;'] was originally deleted. ';' is // char(':' + 1). This does not work, as other for all // x between 0 and 9, 'IPx' will also be in this range. // // This is why we now make two clears: first only of the ip // address, the second will delete all ports. if (s.isWholeMachine()) ryw.clear(KeyRangeRef(addr.withSuffix(LiteralStringRef(":")), addr.withSuffix(LiteralStringRef(";")))); } } TraceEvent("IncludeServersCommit").detail("Servers", describe(servers)).detail("Failed", failed); wait( ryw.commit() ); return Void(); } catch (Error& e) { TraceEvent("IncludeServersError").error(e, true); wait( ryw.onError(e) ); } } } else { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES ); // includeServers might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY ); if (failed) { tr.addReadConflictRange(singleKeyRange(failedServersVersionKey)); tr.set(failedServersVersionKey, versionKey); } else { tr.addReadConflictRange(singleKeyRange(excludedServersVersionKey)); tr.set(excludedServersVersionKey, versionKey); } for(auto& s : servers ) { if (!s.isValid()) { if (failed) { tr.clear(failedServersKeys); } else { tr.clear(excludedServersKeys); } } else if (s.isWholeMachine()) { // Eliminate both any ip-level exclusion (1.2.3.4) and any // port-level exclusions (1.2.3.4:5) // The range ['IP', 'IP;'] was originally deleted. ';' is // char(':' + 1). This does not work, as other for all // x between 0 and 9, 'IPx' will also be in this range. // // This is why we now make two clears: first only of the ip // address, the second will delete all ports. auto addr = failed ? encodeFailedServersKey(s) : encodeExcludedServersKey(s); tr.clear(singleKeyRange(addr)); tr.clear(KeyRangeRef(addr + ':', addr + char(':' + 1))); } else { if (failed) { tr.clear(encodeFailedServersKey(s)); } else { tr.clear(encodeExcludedServersKey(s)); } } } TraceEvent("IncludeServersCommit").detail("Servers", describe(servers)).detail("Failed", failed); wait( tr.commit() ); return Void(); } catch (Error& e) { TraceEvent("IncludeServersError").error(e, true); wait( tr.onError(e) ); } } } } ACTOR Future setClass( Database cx, AddressExclusion server, ProcessClass processClass ) { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES ); vector workers = wait( getWorkers(&tr) ); bool foundChange = false; for(int i = 0; i < workers.size(); i++) { if( server.excludes(workers[i].address) ) { if(processClass.classType() != ProcessClass::InvalidClass) tr.set(processClassKeyFor(workers[i].locality.processId().get()), processClassValue(processClass)); else tr.clear(processClassKeyFor(workers[i].locality.processId().get())); foundChange = true; } } if(foundChange) tr.set(processClassChangeKey, deterministicRandom()->randomUniqueID().toString()); wait( tr.commit() ); return Void(); } catch (Error& e) { wait( tr.onError(e) ); } } } ACTOR Future> getExcludedServers( Transaction* tr ) { state Standalone r = wait( tr->getRange( excludedServersKeys, CLIENT_KNOBS->TOO_MANY ) ); ASSERT( !r.more && r.size() < CLIENT_KNOBS->TOO_MANY ); state Standalone r2 = wait( tr->getRange( failedServersKeys, CLIENT_KNOBS->TOO_MANY ) ); ASSERT( !r2.more && r2.size() < CLIENT_KNOBS->TOO_MANY ); vector exclusions; for(auto i = r.begin(); i != r.end(); ++i) { auto a = decodeExcludedServersKey( i->key ); if (a.isValid()) exclusions.push_back( a ); } for(auto i = r2.begin(); i != r2.end(); ++i) { auto a = decodeFailedServersKey( i->key ); if (a.isValid()) exclusions.push_back( a ); } uniquify(exclusions); return exclusions; } ACTOR Future> getExcludedServers( Database cx ) { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary? tr.setOption( FDBTransactionOptions::LOCK_AWARE ); vector exclusions = wait( getExcludedServers(&tr) ); return exclusions; } catch (Error& e) { wait( tr.onError(e) ); } } } ACTOR Future printHealthyZone( Database cx ) { state Transaction tr(cx); loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr.get(healthyZoneKey) ); if (val.present() && decodeHealthyZoneValue(val.get()).first == ignoreSSFailuresZoneString) { printf("Data distribution has been disabled for all storage server failures in this cluster and thus " "maintenance mode is not active.\n"); } else if(!val.present() || decodeHealthyZoneValue(val.get()).second <= tr.getReadVersion().get()) { printf("No ongoing maintenance.\n"); } else { auto healthyZone = decodeHealthyZoneValue(val.get()); printf("Maintenance for zone %s will continue for %" PRId64 " seconds.\n", healthyZone.first.toString().c_str(), (healthyZone.second-tr.getReadVersion().get())/CLIENT_KNOBS->CORE_VERSIONSPERSECOND); } return Void(); } catch( Error &e ) { wait(tr.onError(e)); } } } ACTOR Future clearHealthyZone(Database cx, bool printWarning, bool clearSSFailureZoneString) { state Transaction tr(cx); TraceEvent("ClearHealthyZone").detail("ClearSSFailureZoneString", clearSSFailureZoneString); loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); Optional val = wait(tr.get(healthyZoneKey)); if (!clearSSFailureZoneString && val.present() && decodeHealthyZoneValue(val.get()).first == ignoreSSFailuresZoneString) { if (printWarning) { printf("ERROR: Maintenance mode cannot be used while data distribution is disabled for storage " "server failures. Use 'datadistribution on' to reenable data distribution.\n"); } return false; } tr.clear(healthyZoneKey); wait(tr.commit()); return true; } catch( Error &e ) { wait(tr.onError(e)); } } } ACTOR Future setHealthyZone(Database cx, StringRef zoneId, double seconds, bool printWarning) { state Transaction tr(cx); TraceEvent("SetHealthyZone").detail("Zone", zoneId).detail("DurationSeconds", seconds); loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); Optional val = wait(tr.get(healthyZoneKey)); if (val.present() && decodeHealthyZoneValue(val.get()).first == ignoreSSFailuresZoneString) { if (printWarning) { printf("ERROR: Maintenance mode cannot be used while data distribution is disabled for storage " "server failures. Use 'datadistribution on' to reenable data distribution.\n"); } return false; } Version readVersion = wait(tr.getReadVersion()); tr.set(healthyZoneKey, healthyZoneValue(zoneId, readVersion + (seconds*CLIENT_KNOBS->CORE_VERSIONSPERSECOND))); wait(tr.commit()); return true; } catch( Error &e ) { wait(tr.onError(e)); } } } ACTOR Future setDDIgnoreRebalanceSwitch(Database cx, bool ignoreRebalance) { state Transaction tr(cx); loop { try { tr.setOption(FDBTransactionOptions::LOCK_AWARE); if (ignoreRebalance) { tr.set(rebalanceDDIgnoreKey, LiteralStringRef("on")); } else { tr.clear(rebalanceDDIgnoreKey); } wait(tr.commit()); return Void(); } catch (Error& e) { wait(tr.onError(e)); } } } ACTOR Future setDDMode( Database cx, int mode ) { state Transaction tr(cx); state int oldMode = -1; state BinaryWriter wr(Unversioned()); wr << mode; loop { try { Optional old = wait( tr.get( dataDistributionModeKey ) ); if (oldMode < 0) { oldMode = 1; if (old.present()) { BinaryReader rd(old.get(), Unversioned()); rd >> oldMode; } } BinaryWriter wrMyOwner(Unversioned()); wrMyOwner << dataDistributionModeLock; tr.set( moveKeysLockOwnerKey, wrMyOwner.toValue() ); BinaryWriter wrLastWrite(Unversioned()); wrLastWrite << deterministicRandom()->randomUniqueID(); tr.set( moveKeysLockWriteKey, wrLastWrite.toValue() ); tr.set( dataDistributionModeKey, wr.toValue() ); if (mode) { // set DDMode to 1 will enable all disabled parts, for instance the SS failure monitors. Optional currentHealthyZoneValue = wait(tr.get(healthyZoneKey)); if (currentHealthyZoneValue.present() && decodeHealthyZoneValue(currentHealthyZoneValue.get()).first == ignoreSSFailuresZoneString) { // only clear the key if it is currently being used to disable all SS failure data movement tr.clear(healthyZoneKey); } tr.clear(rebalanceDDIgnoreKey); } wait( tr.commit() ); return oldMode; } catch (Error& e) { TraceEvent("SetDDModeRetrying").error(e); wait (tr.onError(e)); } } } ACTOR Future checkForExcludingServersTxActor(Transaction* tr, std::set* exclusions, std::set* inProgressExclusion) { // TODO : replace using ExclusionInProgressRangeImpl in special key space ASSERT(inProgressExclusion->size() == 0); // Make sure every time it is cleared beforehand if (!exclusions->size()) return true; tr->setOption( FDBTransactionOptions::READ_SYSTEM_KEYS ); tr->setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary? tr->setOption( FDBTransactionOptions::LOCK_AWARE ); // Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed recovery // Check that there aren't any storage servers with addresses violating the exclusions Standalone serverList = wait( tr->getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) ); ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY ); state bool ok = true; for(auto& s : serverList) { auto addresses = decodeServerListValue( s.value ).getKeyValues.getEndpoint().addresses; if ( addressExcluded(*exclusions, addresses.address) ) { ok = false; inProgressExclusion->insert(addresses.address); } if ( addresses.secondaryAddress.present() && addressExcluded(*exclusions, addresses.secondaryAddress.get()) ) { ok = false; inProgressExclusion->insert(addresses.secondaryAddress.get()); } } if (ok) { Optional> value = wait( tr->get(logsKey) ); ASSERT(value.present()); auto logs = decodeLogsValue(value.get()); for( auto const& log : logs.first ) { if (log.second == NetworkAddress() || addressExcluded(*exclusions, log.second)) { ok = false; inProgressExclusion->insert(log.second); } } for( auto const& log : logs.second ) { if (log.second == NetworkAddress() || addressExcluded(*exclusions, log.second)) { ok = false; inProgressExclusion->insert(log.second); } } } return ok; } ACTOR Future> checkForExcludingServers(Database cx, vector excl, bool waitForAllExcluded) { state std::set exclusions( excl.begin(), excl.end() ); state std::set inProgressExclusion; loop { state Transaction tr(cx); inProgressExclusion.clear(); try { bool ok = wait(checkForExcludingServersTxActor(&tr, &exclusions, &inProgressExclusion)); if (ok) return inProgressExclusion; if (!waitForAllExcluded) break; wait( delayJittered( 1.0 ) ); // SOMEDAY: watches! } catch (Error& e) { wait( tr.onError(e) ); } } return inProgressExclusion; } ACTOR Future mgmtSnapCreate(Database cx, Standalone snapCmd, UID snapUID) { try { wait(snapCreate(cx, snapCmd, snapUID)); TraceEvent("SnapCreateSucceeded").detail("snapUID", snapUID); return Void(); } catch (Error& e) { TraceEvent(SevWarn, "SnapCreateFailed").detail("snapUID", snapUID).error(e); throw; } } ACTOR Future waitForFullReplication( Database cx ) { state ReadYourWritesTransaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); Standalone confResults = wait( tr.getRange(configKeys, CLIENT_KNOBS->TOO_MANY) ); ASSERT( !confResults.more && confResults.size() < CLIENT_KNOBS->TOO_MANY ); state DatabaseConfiguration config; config.fromKeyValues((VectorRef) confResults); state std::vector>> replicasFutures; for(auto& region : config.regions) { replicasFutures.push_back(tr.get(datacenterReplicasKeyFor(region.dcId))); } wait( waitForAll(replicasFutures) ); state std::vector> watchFutures; for(int i = 0; i < config.regions.size(); i++) { if( !replicasFutures[i].get().present() || decodeDatacenterReplicasValue(replicasFutures[i].get().get()) < config.storageTeamSize ) { watchFutures.push_back(tr.watch(datacenterReplicasKeyFor(config.regions[i].dcId))); } } if( !watchFutures.size() || (config.usableRegions == 1 && watchFutures.size() < config.regions.size())) { return Void(); } wait( tr.commit() ); wait( waitForAny(watchFutures) ); tr.reset(); } catch (Error& e) { wait( tr.onError(e) ); } } } ACTOR Future timeKeeperSetDisable(Database cx) { loop { state Transaction tr(cx); try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::LOCK_AWARE); tr.set(timeKeeperDisableKey, StringRef()); wait(tr.commit()); return Void(); } catch (Error &e) { wait(tr.onError(e)); } } } ACTOR Future lockDatabase( Transaction* tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if(val.present()) { if(BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) == id) { return Void(); } else { //TraceEvent("DBA_LockLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())); throw database_locked(); } } tr->atomicOp(databaseLockedKey, BinaryWriter::toValue(id, Unversioned()).withPrefix(LiteralStringRef("0123456789")).withSuffix(LiteralStringRef("\x00\x00\x00\x00")), MutationRef::SetVersionstampedValue); tr->addWriteConflictRange(normalKeys); return Void(); } ACTOR Future lockDatabase( Reference tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if(val.present()) { if(BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) == id) { return Void(); } else { //TraceEvent("DBA_LockLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())); throw database_locked(); } } tr->atomicOp(databaseLockedKey, BinaryWriter::toValue(id, Unversioned()).withPrefix(LiteralStringRef("0123456789")).withSuffix(LiteralStringRef("\x00\x00\x00\x00")), MutationRef::SetVersionstampedValue); tr->addWriteConflictRange(normalKeys); return Void(); } ACTOR Future lockDatabase( Database cx, UID id ) { state Transaction tr(cx); loop { try { wait( lockDatabase(&tr, id) ); wait( tr.commit() ); return Void(); } catch( Error &e ) { if(e.code() == error_code_database_locked) throw e; wait( tr.onError(e) ); } } } ACTOR Future unlockDatabase( Transaction* tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if(!val.present()) return Void(); if(val.present() && BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) != id) { //TraceEvent("DBA_UnlockLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())); throw database_locked(); } tr->clear(singleKeyRange(databaseLockedKey)); return Void(); } ACTOR Future unlockDatabase( Reference tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if(!val.present()) return Void(); if(val.present() && BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) != id) { //TraceEvent("DBA_UnlockLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())); throw database_locked(); } tr->clear(singleKeyRange(databaseLockedKey)); return Void(); } ACTOR Future unlockDatabase( Database cx, UID id ) { state Transaction tr(cx); loop { try { wait( unlockDatabase(&tr, id) ); wait( tr.commit() ); return Void(); } catch( Error &e ) { if(e.code() == error_code_database_locked) throw e; wait( tr.onError(e) ); } } } ACTOR Future checkDatabaseLock( Transaction* tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if (val.present() && BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) != id) { //TraceEvent("DBA_CheckLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())).backtrace(); throw database_locked(); } return Void(); } ACTOR Future checkDatabaseLock( Reference tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if (val.present() && BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) != id) { //TraceEvent("DBA_CheckLocked").detail("Expecting", id).detail("Lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())).backtrace(); throw database_locked(); } return Void(); } ACTOR Future advanceVersion(Database cx, Version v) { state Transaction tr(cx); loop { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::LOCK_AWARE); try { Version rv = wait(tr.getReadVersion()); if (rv <= v) { tr.set(minRequiredCommitVersionKey, BinaryWriter::toValue(v + 1, Unversioned())); wait(tr.commit()); } else { printf("Current read version is %ld\n", rv); return Void(); } } catch (Error& e) { wait(tr.onError(e)); } } } ACTOR Future forceRecovery( Reference clusterFile, Key dcId ) { state Reference>> clusterInterface(new AsyncVar>); state Future leaderMon = monitorLeader(clusterFile, clusterInterface); loop { choose { when ( wait( clusterInterface->get().present() ? brokenPromiseToNever( clusterInterface->get().get().forceRecovery.getReply( ForceRecoveryRequest(dcId) ) ) : Never() ) ) { return Void(); } when ( wait( clusterInterface->onChange() )) {} } } } ACTOR Future waitForPrimaryDC( Database cx, StringRef dcId ) { state ReadYourWritesTransaction tr(cx); loop { try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); Optional res = wait( tr.get(primaryDatacenterKey) ); if(res.present() && res.get() == dcId) { return Void(); } state Future watchFuture = tr.watch(primaryDatacenterKey); wait(tr.commit()); wait(watchFuture); tr.reset(); } catch (Error& e) { wait( tr.onError(e) ); } } } ACTOR Future changeCachedRange(Database cx, KeyRangeRef range, bool add) { state ReadYourWritesTransaction tr(cx); state KeyRange sysRange = KeyRangeRef(storageCacheKey(range.begin), storageCacheKey(range.end)); state KeyRange sysRangeClear = KeyRangeRef(storageCacheKey(range.begin), keyAfter(storageCacheKey(range.end))); state KeyRange privateRange = KeyRangeRef(cacheKeysKey(0, range.begin), cacheKeysKey(0, range.end)); state Value trueValue = storageCacheValue(std::vector{ 0 }); state Value falseValue = storageCacheValue(std::vector{}); loop { tr.setOption(FDBTransactionOptions::LOCK_AWARE); tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); try { tr.clear(sysRangeClear); tr.clear(privateRange); tr.addReadConflictRange(privateRange); Standalone previous = wait(tr.getRange(KeyRangeRef(storageCachePrefix, sysRange.begin), 1, true)); bool prevIsCached = false; if (!previous.empty()) { std::vector prevVal; decodeStorageCacheValue(previous[0].value, prevVal); prevIsCached = !prevVal.empty(); } if (prevIsCached && !add) { // we need to uncache from here tr.set(sysRange.begin, falseValue); tr.set(privateRange.begin, serverKeysFalse); } else if (!prevIsCached && add) { // we need to cache, starting from here tr.set(sysRange.begin, trueValue); tr.set(privateRange.begin, serverKeysTrue); } Standalone after = wait(tr.getRange(KeyRangeRef(sysRange.end, storageCacheKeys.end), 1, false)); bool afterIsCached = false; if (!after.empty()) { std::vector afterVal; decodeStorageCacheValue(after[0].value, afterVal); afterIsCached = afterVal.empty(); } if (afterIsCached && !add) { tr.set(sysRange.end, trueValue); tr.set(privateRange.end, serverKeysTrue); } else if (!afterIsCached && add) { tr.set(sysRange.end, falseValue); tr.set(privateRange.end, serverKeysFalse); } wait(tr.commit()); return Void(); } catch (Error& e) { state Error err = e; wait(tr.onError(err)); TraceEvent(SevDebug, "ChangeCachedRangeError").error(err); } } } Future addCachedRange(const Database& cx, KeyRangeRef range) { return changeCachedRange(cx, range, true); } Future removeCachedRange(const Database& cx, KeyRangeRef range) { return changeCachedRange(cx, range, false); } json_spirit::Value_type normJSONType(json_spirit::Value_type type) { if (type == json_spirit::int_type) return json_spirit::real_type; return type; } void schemaCoverage( std::string const& spath, bool covered ) { static std::map> coveredSchemaPaths; if( coveredSchemaPaths[covered].insert(spath).second ) { TraceEvent ev(SevInfo, "CodeCoverage"); ev.detail("File", "documentation/StatusSchema.json/" + spath).detail("Line", 0); if (!covered) ev.detail("Covered", 0); } } bool schemaMatch( json_spirit::mValue const& schemaValue, json_spirit::mValue const& resultValue, std::string& errorStr, Severity sev, bool checkCoverage, std::string path, std::string schemaPath ) { // Returns true if everything in `result` is permitted by `schema` bool ok = true; try { if(normJSONType(schemaValue.type()) != normJSONType(resultValue.type())) { errorStr += format("ERROR: Incorrect value type for key `%s'\n", path.c_str()); TraceEvent(sev, "SchemaMismatch").detail("Path", path).detail("SchemaType", schemaValue.type()).detail("ValueType", resultValue.type()); return false; } if(resultValue.type() == json_spirit::obj_type) { auto& result = resultValue.get_obj(); auto& schema = schemaValue.get_obj(); for(auto& rkv : result) { auto& key = rkv.first; auto& rv = rkv.second; std::string kpath = path + "." + key; std::string spath = schemaPath + "." + key; if(checkCoverage) { schemaCoverage(spath); } if(!schema.count(key)) { errorStr += format("ERROR: Unknown key `%s'\n", kpath.c_str()); TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaPath", spath); ok = false; continue; } auto& sv = schema.at(key); if(sv.type() == json_spirit::obj_type && sv.get_obj().count("$enum")) { auto& enum_values = sv.get_obj().at("$enum").get_array(); bool any_match = false; for(auto& enum_item : enum_values) if(enum_item == rv) { any_match = true; if(checkCoverage) { schemaCoverage(spath + ".$enum." + enum_item.get_str()); } break; } if(!any_match) { errorStr += format("ERROR: Unknown value `%s' for key `%s'\n", json_spirit::write_string(rv).c_str(), kpath.c_str()); TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaEnumItems", enum_values.size()).detail("Value", json_spirit::write_string(rv)); if(checkCoverage) { schemaCoverage(spath + ".$enum." + json_spirit::write_string(rv)); } ok = false; } } else if(sv.type() == json_spirit::obj_type && sv.get_obj().count("$map")) { if(rv.type() != json_spirit::obj_type) { errorStr += format("ERROR: Expected an object as the value for key `%s'\n", kpath.c_str()); TraceEvent(sev, "SchemaMismatch").detail("Path", kpath).detail("SchemaType", sv.type()).detail("ValueType", rv.type()); ok = false; continue; } if(sv.get_obj().at("$map").type() != json_spirit::obj_type) { continue; } auto& schemaVal = sv.get_obj().at("$map"); auto& valueObj = rv.get_obj(); if(checkCoverage) { schemaCoverage(spath + ".$map"); } for(auto& valuePair : valueObj) { auto vpath = kpath + "[" + valuePair.first + "]"; auto upath = spath + ".$map"; if (valuePair.second.type() != json_spirit::obj_type) { errorStr += format("ERROR: Expected an object for `%s'\n", vpath.c_str()); TraceEvent(sev, "SchemaMismatch").detail("Path", vpath).detail("ValueType", valuePair.second.type()); ok = false; continue; } if(!schemaMatch(schemaVal, valuePair.second, errorStr, sev, checkCoverage, vpath, upath)) { ok = false; } } } else { if(!schemaMatch(sv, rv, errorStr, sev, checkCoverage, kpath, spath)) { ok = false; } } } } else if(resultValue.type() == json_spirit::array_type) { auto& valueArray = resultValue.get_array(); auto& schemaArray = schemaValue.get_array(); if(!schemaArray.size()) { // An empty schema array means that the value array is required to be empty if(valueArray.size()) { errorStr += format("ERROR: Expected an empty array for key `%s'\n", path.c_str()); TraceEvent(sev, "SchemaMismatch").detail("Path", path).detail("SchemaSize", schemaArray.size()).detail("ValueSize", valueArray.size()); return false; } } else if(schemaArray.size() == 1) { // A one item schema array means that all items in the value must match the first item in the schema int index = 0; for(auto &valueItem : valueArray) { if(!schemaMatch(schemaArray[0], valueItem, errorStr, sev, checkCoverage, path + format("[%d]", index), schemaPath + "[0]")) { ok = false; } index++; } } else { ASSERT(false); // Schema doesn't make sense } } return ok; } catch (std::exception& e) { TraceEvent(SevError, "SchemaMatchException").detail("What", e.what()).detail("Path", path).detail("SchemaPath", schemaPath); throw unknown_error(); } } TEST_CASE("/ManagementAPI/AutoQuorumChange/checkLocality") { wait(Future(Void())); std::vector workers; std::vector chosen; std::set excluded; AutoQuorumChange change(5); for(int i = 0; i < 10; i++) { ProcessData data; auto dataCenter = std::to_string(i / 4 % 2); auto dataHall = dataCenter + std::to_string(i / 2 % 2); auto rack = dataHall + std::to_string(i % 2); auto machineId = rack + std::to_string(i); data.locality.set(LiteralStringRef("dcid"), StringRef(dataCenter)); data.locality.set(LiteralStringRef("data_hall"), StringRef(dataHall)); data.locality.set(LiteralStringRef("rack"), StringRef(rack)); data.locality.set(LiteralStringRef("zoneid"), StringRef(rack)); data.locality.set(LiteralStringRef("machineid"), StringRef(machineId)); data.address.ip = IPAddress(i); workers.push_back(data); } auto noAssignIndex = deterministicRandom()->randomInt(0, workers.size()); workers[noAssignIndex].processClass._class = ProcessClass::CoordinatorClass; change.addDesiredWorkers(chosen, workers, 5, excluded); std::map> chosenValues; ASSERT(chosen.size() == 5); std::vector fields({ LiteralStringRef("dcid"), LiteralStringRef("data_hall"), LiteralStringRef("zoneid"), LiteralStringRef("machineid") }); for(auto worker = chosen.begin(); worker != chosen.end(); worker++) { ASSERT(worker->ip.toV4() < workers.size()); LocalityData data = workers[worker->ip.toV4()].locality; for(auto field = fields.begin(); field != fields.end(); field++) { chosenValues[*field].insert(data.get(*field).get()); } } ASSERT(chosenValues[LiteralStringRef("dcid")].size() == 2); ASSERT(chosenValues[LiteralStringRef("data_hall")].size() == 4); ASSERT(chosenValues[LiteralStringRef("zoneid")].size() == 5); ASSERT(chosenValues[LiteralStringRef("machineid")].size() == 5); ASSERT(std::find(chosen.begin(), chosen.end(), workers[noAssignIndex].address) != chosen.end()); return Void(); }