/* * ManagementAPI.actor.cpp * * This source file is part of the FoundationDB open source project * * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "flow/actorcompiler.h" #include "ManagementAPI.h" #include "SystemData.h" #include "NativeAPI.h" #include "CoordinationInterface.h" #include "DatabaseContext.h" #include "fdbrpc/simulator.h" #include "StatusClient.h" #include "flow/UnitTest.h" #include "fdbrpc/ReplicationPolicy.h" #include "fdbrpc/Replication.h" static Future> getExcludedServers( Transaction* const& tr ); bool isInteger(const std::string& s) { if( s.empty() ) return false; char *p; auto ign = strtol(s.c_str(), &p, 10); return (*p == 0); } // Defines the mapping between configuration names (as exposed by fdbcli, buildConfiguration()) and actual configuration parameters std::map configForToken( std::string const& mode ) { std::map out; std::string p = configKeysPrefix.toString(); if (mode == "new") { out[p+"initialized"]="1"; return out; } size_t pos; // key:=value is unvalidated and unchecked pos = mode.find( ":=" ); if( pos != std::string::npos ) { out[p+mode.substr(0, pos)] = mode.substr(pos+2); return out; } // key=value is constrained to a limited set of options and basic validation is performed pos = mode.find( "=" ); if( pos != std::string::npos ) { std::string key = mode.substr(0, pos); std::string value = mode.substr(pos+1); if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "satellite_logs") && isInteger(value) ) { out[p+key] = value; } if( key == "primary_dc" || key == "remote_dc" || key == "primary_satellite_dcs" || key == "remote_satellite_dcs" ) { out[p+key] = value; } return out; } Optional storeType; if (mode == "ssd-1") { storeType= KeyValueStoreType::SSD_BTREE_V1; } else if (mode == "ssd" || mode == "ssd-2") { storeType = KeyValueStoreType::SSD_BTREE_V2; } else if (mode == "memory") { storeType= KeyValueStoreType::MEMORY; } // Add any new store types to fdbserver/workloads/ConfigureDatabase, too if (storeType.present()) { out[p+"log_engine"] = out[p+"storage_engine"] = format("%d", storeType.get()); return out; } std::string redundancy, log_replicas; IRepPolicyRef storagePolicy; IRepPolicyRef tLogPolicy; bool redundancySpecified = true; if (mode == "single") { redundancy="1"; log_replicas="1"; storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyOne()); } else if(mode == "double" || mode == "fast_recovery_double") { redundancy="2"; log_replicas="2"; storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "triple" || mode == "fast_recovery_triple") { redundancy="3"; log_replicas="3"; storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "two_datacenter") { redundancy="3"; log_replicas="3"; storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "three_datacenter") { redundancy="3"; log_replicas="3"; storagePolicy = tLogPolicy = IRepPolicyRef(new PolicyAnd({ IRepPolicyRef(new PolicyAcross(3, "dcid", IRepPolicyRef(new PolicyOne()))), IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))) })); } else if(mode == "three_data_hall") { redundancy="3"; log_replicas="4"; storagePolicy = IRepPolicyRef(new PolicyAcross(3, "data_hall", IRepPolicyRef(new PolicyOne()))); tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) )); } else if(mode == "multi_dc") { redundancy="6"; log_replicas="4"; storagePolicy = IRepPolicyRef(new PolicyAcross(3, "dcid", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) )); tLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) )); } else redundancySpecified = false; if (redundancySpecified) { out[p+"storage_replicas"] = out[p+"storage_quorum"] = redundancy; out[p+"log_replicas"] = log_replicas; out[p+"log_anti_quorum"] = "0"; BinaryWriter policyWriter(IncludeVersion()); serializeReplicationPolicy(policyWriter, storagePolicy); out[p+"storage_replication_policy"] = policyWriter.toStringRef().toString(); policyWriter = BinaryWriter(IncludeVersion()); serializeReplicationPolicy(policyWriter, tLogPolicy); out[p+"log_replication_policy"] = policyWriter.toStringRef().toString(); return out; } std::string remote_redundancy, remote_log_replicas; IRepPolicyRef remoteTLogPolicy; bool remoteRedundancySpecified = true; if (mode == "remote_single") { remote_redundancy="1"; remote_log_replicas="1"; remoteTLogPolicy = IRepPolicyRef(new PolicyOne()); } else if(mode == "remote_double") { remote_redundancy="2"; remote_log_replicas="2"; remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "remote_triple") { remote_redundancy="3"; remote_log_replicas="3"; remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "remote_three_data_hall") { remote_redundancy="3"; remote_log_replicas="4"; remoteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "data_hall", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) )); } else remoteRedundancySpecified = false; if (remoteRedundancySpecified) { out[p+"remote_storage_replicas"] = out[p+"remote_storage_quorum"] = remote_redundancy; out[p+"remote_log_replicas"] = out[p+"log_routers"] = remote_log_replicas; BinaryWriter policyWriter(IncludeVersion()); serializeReplicationPolicy(policyWriter, remoteTLogPolicy); out[p+"remote_log_policy"] = policyWriter.toStringRef().toString(); return out; } std::string satellite_log_replicas, satellite_anti_quorum, satellite_usable_dcs; IRepPolicyRef satelliteTLogPolicy; bool satelliteRedundancySpecified = true; if (mode == "one_satellite_single") { satellite_anti_quorum="0"; satellite_usable_dcs="1"; satellite_log_replicas="1"; satelliteTLogPolicy = IRepPolicyRef(new PolicyOne()); } else if(mode == "one_satellite_double") { satellite_anti_quorum="0"; satellite_usable_dcs="1"; satellite_log_replicas="2"; satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "one_satellite_triple") { satellite_anti_quorum="0"; satellite_usable_dcs="1"; satellite_log_replicas="3"; satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(3, "zoneid", IRepPolicyRef(new PolicyOne()))); } else if(mode == "two_satellite_safe") { satellite_anti_quorum="0"; satellite_usable_dcs="2"; satellite_log_replicas="4"; satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) )); } else if(mode == "two_satellite_fast") { satellite_anti_quorum="2"; satellite_usable_dcs="2"; satellite_log_replicas="4"; satelliteTLogPolicy = IRepPolicyRef(new PolicyAcross(2, "dcid", IRepPolicyRef(new PolicyAcross(2, "zoneid", IRepPolicyRef(new PolicyOne()))) )); } else satelliteRedundancySpecified = false; if (satelliteRedundancySpecified) { out[p+"satellite_anti_quorum"] = satellite_anti_quorum; out[p+"satellite_usable_dcs"] = satellite_usable_dcs; out[p+"satellite_log_replicas"] = satellite_log_replicas; BinaryWriter policyWriter(IncludeVersion()); serializeReplicationPolicy(policyWriter, satelliteTLogPolicy); out[p+"satellite_log_policy"] = policyWriter.toStringRef().toString(); return out; } return out; } ConfigurationResult::Type buildConfiguration( std::vector const& modeTokens, std::map& outConf ) { for(auto it : modeTokens) { std::string mode = it.toString(); auto m = configForToken( mode ); if( !m.size() ) return ConfigurationResult::UNKNOWN_OPTION; for( auto t = m.begin(); t != m.end(); ++t ) { if( outConf.count( t->first ) ) { return ConfigurationResult::CONFLICTING_OPTIONS; } outConf[t->first] = t->second; } } auto p = configKeysPrefix.toString(); if(!outConf.count(p + "storage_replication_policy") && outConf.count(p + "storage_replicas")) { int storageCount = stoi(outConf[p + "storage_replicas"]); IRepPolicyRef storagePolicy = IRepPolicyRef(new PolicyAcross(storageCount, "zoneid", IRepPolicyRef(new PolicyOne()))); BinaryWriter policyWriter(IncludeVersion()); serializeReplicationPolicy(policyWriter, storagePolicy); outConf[p+"storage_replication_policy"] = policyWriter.toStringRef().toString(); } if(!outConf.count(p + "log_replication_policy") && outConf.count(p + "log_replicas")) { int logCount = stoi(outConf[p + "log_replicas"]); IRepPolicyRef logPolicy = IRepPolicyRef(new PolicyAcross(logCount, "zoneid", IRepPolicyRef(new PolicyOne()))); BinaryWriter policyWriter(IncludeVersion()); serializeReplicationPolicy(policyWriter, logPolicy); outConf[p+"log_replication_policy"] = policyWriter.toStringRef().toString(); } return ConfigurationResult::SUCCESS; } ConfigurationResult::Type buildConfiguration( std::string const& configMode, std::map& outConf ) { std::vector modes; int p = 0; while ( p < configMode.size() ) { int end = configMode.find_first_of(' ', p); if (end == configMode.npos) end = configMode.size(); modes.push_back( StringRef(configMode).substr(p, end-p) ); p = end+1; } return buildConfiguration( modes, outConf ); } bool isCompleteConfiguration( std::map const& options ) { std::string p = configKeysPrefix.toString(); return options.count( p+"log_replicas" ) == 1 && options.count( p+"log_anti_quorum" ) == 1 && options.count( p+"storage_quorum" ) == 1 && options.count( p+"storage_replicas" ) == 1 && options.count( p+"log_engine" ) == 1 && options.count( p+"storage_engine" ) == 1; } ACTOR Future changeConfig( Database cx, std::map m ) { state StringRef initIdKey = LiteralStringRef( "\xff/init_id" ); state Transaction tr(cx); if (!m.size()) return ConfigurationResult::NO_OPTIONS_PROVIDED; // make sure we have essential configuration options std::string initKey = configKeysPrefix.toString() + "initialized"; state bool creating = m.count( initKey ) != 0; if (creating) { m[initIdKey.toString()] = g_random->randomUniqueID().toString(); if (!isCompleteConfiguration(m)) return ConfigurationResult::INCOMPLETE_CONFIGURATION; } loop { try { tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); if (creating) { tr.setOption( FDBTransactionOptions::INITIALIZE_NEW_DATABASE ); tr.addReadConflictRange( singleKeyRange( initIdKey ) ); } else if (m.size()) { // might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY ); tr.addReadConflictRange( singleKeyRange(m.begin()->first) ); } for(auto i=m.begin(); i!=m.end(); ++i) tr.set( StringRef(i->first), StringRef(i->second) ); Void _ = wait( tr.commit() ); break; } catch (Error& e) { state Error e1(e); if ( (e.code() == error_code_not_committed || e.code() == error_code_transaction_too_old ) && creating) { // The database now exists. Determine whether we created it or it was already existing/created by someone else. The latter is an error. tr.reset(); loop { try { tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption(FDBTransactionOptions::LOCK_AWARE); Optional v = wait( tr.get( initIdKey ) ); if (v != m[initIdKey.toString()]) return ConfigurationResult::DATABASE_ALREADY_CREATED; else return ConfigurationResult::DATABASE_CREATED; } catch (Error& e2) { Void _ = wait( tr.onError(e2) ); } } } Void _ = wait( tr.onError(e1) ); } } return ConfigurationResult::SUCCESS; } ConfigureAutoResult parseConfig( StatusObject const& status ) { ConfigureAutoResult result; StatusObjectReader statusObj(status); StatusObjectReader statusObjCluster; if (!statusObj.get("cluster", statusObjCluster)) return ConfigureAutoResult(); StatusObjectReader statusObjConfig; if (!statusObjCluster.get("configuration", statusObjConfig)) return ConfigureAutoResult(); if (!statusObjConfig.get("redundancy.factor", result.old_replication)) return ConfigureAutoResult(); result.auto_replication = result.old_replication; int storage_replication; int log_replication; if( result.old_replication == "single" ) { result.auto_replication = "double"; storage_replication = 2; log_replication = 2; } else if( result.old_replication == "double" || result.old_replication == "fast_recovery_double" ) { storage_replication = 2; log_replication = 2; } else if( result.old_replication == "triple" || result.old_replication == "fast_recovery_triple" ) { storage_replication = 3; log_replication = 3; } else if( result.old_replication == "two_datacenter" ) { storage_replication = 3; log_replication = 3; } else if( result.old_replication == "three_datacenter" ) { storage_replication = 3; log_replication = 3; } else return ConfigureAutoResult(); StatusObjectReader machinesMap; if (!statusObjCluster.get("machines", machinesMap)) return ConfigureAutoResult(); std::map machineid_dcid; std::set datacenters; int machineCount = 0; for (auto mach : machinesMap.obj()) { StatusObjectReader machine(mach.second); std::string dcId; if (machine.get("datacenter_id", dcId)) { machineid_dcid[mach.first] = dcId; datacenters.insert(dcId); } machineCount++; } result.machines = machineCount; if(datacenters.size() > 1) return ConfigureAutoResult(); StatusObjectReader processesMap; if (!statusObjCluster.get("processes", processesMap)) return ConfigureAutoResult(); std::set oldMachinesWithTransaction; int oldTransactionProcesses = 0; std::map>> machine_processes; int processCount = 0; for (auto proc : processesMap.obj()){ StatusObjectReader process(proc.second); if (!process.has("excluded") || !process.last().get_bool()) { std::string addrStr; if (!process.get("address", addrStr)) return ConfigureAutoResult(); std::string class_source; if (!process.get("class_source", class_source)) return ConfigureAutoResult(); std::string class_type; if (!process.get("class_type", class_type)) return ConfigureAutoResult(); std::string machineId; if (!process.get("machine_id", machineId)) return ConfigureAutoResult(); NetworkAddress addr = NetworkAddress::parse(addrStr); ProcessClass processClass(class_type, class_source); if(processClass.classType() == ProcessClass::TransactionClass || processClass.classType() == ProcessClass::LogClass) { oldMachinesWithTransaction.insert(machineId); } if(processClass.classType() == ProcessClass::TransactionClass || processClass.classType() == ProcessClass::ProxyClass || processClass.classType() == ProcessClass::ResolutionClass || processClass.classType() == ProcessClass::StatelessClass || processClass.classType() == ProcessClass::LogClass) { oldTransactionProcesses++; } if( processClass.classSource() == ProcessClass::AutoSource ) { processClass = ProcessClass(ProcessClass::UnsetClass, ProcessClass::CommandLineSource); result.address_class[addr] = processClass; } if( processClass.classType() != ProcessClass::TesterClass ) { machine_processes[machineId].push_back(std::make_pair(addr, processClass)); processCount++; } } } result.processes = processCount; result.old_processes_with_transaction = oldTransactionProcesses; result.old_machines_with_transaction = oldMachinesWithTransaction.size(); std::map, std::vector>> count_processes; for( auto& it : machine_processes ) { count_processes[std::make_pair(it.second.size(), it.first)] = it.second; } std::set machinesWithTransaction; std::set machinesWithStorage; int totalTransactionProcesses = 0; int existingProxyCount = 0; int existingResolverCount = 0; int existingStatelessCount = 0; for( auto& it : machine_processes ) { for(auto& proc : it.second ) { if(proc.second == ProcessClass::TransactionClass || proc.second == ProcessClass::LogClass) { totalTransactionProcesses++; machinesWithTransaction.insert(it.first); } if(proc.second == ProcessClass::StatelessClass) { existingStatelessCount++; } if(proc.second == ProcessClass::ProxyClass) { existingProxyCount++; } if(proc.second == ProcessClass::ResolutionClass) { existingResolverCount++; } if(proc.second == ProcessClass::StorageClass) { machinesWithStorage.insert(it.first); } if(proc.second == ProcessClass::UnsetClass && proc.second.classSource() == ProcessClass::DBSource) { machinesWithStorage.insert(it.first); } } } if( processCount < 10 ) return ConfigureAutoResult(); result.desired_resolvers = 1; int resolverCount; if (!statusObjConfig.get("resolvers", result.old_resolvers)) { result.old_resolvers = CLIENT_KNOBS->DEFAULT_AUTO_RESOLVERS; statusObjConfig.get("auto_resolvers", result.old_resolvers); result.auto_resolvers = result.desired_resolvers; resolverCount = result.auto_resolvers; } else { result.auto_resolvers = result.old_resolvers; resolverCount = result.old_resolvers; } result.desired_proxies = std::min( 12, processCount / 15 ); int proxyCount; if (!statusObjConfig.get("proxies", result.old_proxies)) { result.old_proxies = CLIENT_KNOBS->DEFAULT_AUTO_PROXIES; statusObjConfig.get("auto_proxies", result.old_proxies); result.auto_proxies = result.desired_proxies; proxyCount = result.auto_proxies; } else { result.auto_proxies = result.old_proxies; proxyCount = result.old_proxies; } result.desired_logs = std::min( 12, processCount / 20 ); result.desired_logs = std::max( result.desired_logs, log_replication + 1 ); result.desired_logs = std::min( result.desired_logs, machine_processes.size() ); int logCount; if (!statusObjConfig.get("logs", result.old_logs)) { result.old_logs = CLIENT_KNOBS->DEFAULT_AUTO_LOGS; statusObjConfig.get("auto_logs", result.old_logs); result.auto_logs = result.desired_logs; logCount = result.auto_logs; } else { result.auto_logs = result.old_logs; logCount = result.old_logs; } logCount = std::max(logCount, log_replication); totalTransactionProcesses += std::min(existingProxyCount, proxyCount); totalTransactionProcesses += std::min(existingResolverCount, resolverCount); totalTransactionProcesses += existingStatelessCount; //if one process on a machine is transaction class, make them all transaction class for( auto& it : count_processes ) { if( machinesWithTransaction.count(it.first.second) && !machinesWithStorage.count(it.first.second) ) { for(auto& proc : it.second ) { if(proc.second == ProcessClass::UnsetClass && proc.second.classSource() == ProcessClass::CommandLineSource) { result.address_class[proc.first] = ProcessClass(ProcessClass::TransactionClass, ProcessClass::AutoSource); totalTransactionProcesses++; } } } } int desiredTotalTransactionProcesses = logCount + resolverCount + proxyCount; //add machines with all transaction class until we have enough processes and enough machines for( auto& it : count_processes ) { if( machinesWithTransaction.size() >= logCount && totalTransactionProcesses >= desiredTotalTransactionProcesses ) break; if( !machinesWithTransaction.count(it.first.second) && !machinesWithStorage.count(it.first.second) ) { for(auto& proc : it.second ) { if(proc.second == ProcessClass::UnsetClass && proc.second.classSource() == ProcessClass::CommandLineSource) { ASSERT(proc.second != ProcessClass::TransactionClass); result.address_class[proc.first] = ProcessClass(ProcessClass::TransactionClass, ProcessClass::AutoSource); totalTransactionProcesses++; machinesWithTransaction.insert(it.first.second); } } } } if( machinesWithTransaction.size() < logCount || totalTransactionProcesses < desiredTotalTransactionProcesses ) return ConfigureAutoResult(); result.auto_processes_with_transaction = totalTransactionProcesses; result.auto_machines_with_transaction = machinesWithTransaction.size(); if( 3*totalTransactionProcesses > processCount ) return ConfigureAutoResult(); return result; } ACTOR Future autoConfig( Database cx, ConfigureAutoResult conf ) { state Transaction tr(cx); if(!conf.address_class.size()) return ConfigurationResult::INCOMPLETE_CONFIGURATION; //FIXME: correct return type loop { try { tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); vector workers = wait( getWorkers(&tr) ); std::map>> address_processId; for(auto& w : workers) { address_processId[w.address] = w.locality.processId(); } for(auto& it : conf.address_class) { if( it.second.classSource() == ProcessClass::CommandLineSource ) { tr.clear(processClassKeyFor(address_processId[it.first].get())); } else { tr.set(processClassKeyFor(address_processId[it.first].get()), processClassValue(it.second)); } } if(conf.address_class.size()) tr.set(processClassChangeKey, g_random->randomUniqueID().toString()); if(conf.auto_logs != conf.old_logs) tr.set(configKeysPrefix.toString() + "auto_logs", format("%d", conf.auto_logs)); if(conf.auto_proxies != conf.old_proxies) tr.set(configKeysPrefix.toString() + "auto_proxies", format("%d", conf.auto_proxies)); if(conf.auto_resolvers != conf.old_resolvers) tr.set(configKeysPrefix.toString() + "auto_resolvers", format("%d", conf.auto_resolvers)); if( conf.auto_replication != conf.old_replication ) { std::vector modes; modes.push_back(conf.auto_replication); std::map m; auto r = buildConfiguration( modes, m ); if (r != ConfigurationResult::SUCCESS) return r; for(auto& kv : m) tr.set(kv.first, kv.second); } Void _ = wait( tr.commit() ); return ConfigurationResult::SUCCESS; } catch( Error &e ) { Void _ = wait( tr.onError(e)); } } } Future changeConfig( Database const& cx, std::vector const& modes, Optional const& conf ) { if( modes.size() && modes[0] == LiteralStringRef("auto") && conf.present() ) { return autoConfig(cx, conf.get()); } std::map m; auto r = buildConfiguration( modes, m ); if (r != ConfigurationResult::SUCCESS) return r; return changeConfig(cx, m); } Future changeConfig( Database const& cx, std::string const& modes ) { TraceEvent("ChangeConfig").detail("Mode", modes); std::map m; auto r = buildConfiguration( modes, m ); if (r != ConfigurationResult::SUCCESS) return r; return changeConfig(cx, m); } ACTOR Future> getWorkers( Transaction* tr ) { state Future> processClasses = tr->getRange( processClassKeys, CLIENT_KNOBS->TOO_MANY ); state Future> processData = tr->getRange( workerListKeys, CLIENT_KNOBS->TOO_MANY ); Void _ = wait( success(processClasses) && success(processData) ); ASSERT( !processClasses.get().more && processClasses.get().size() < CLIENT_KNOBS->TOO_MANY ); ASSERT( !processData.get().more && processData.get().size() < CLIENT_KNOBS->TOO_MANY ); std::map>,ProcessClass> id_class; for( int i = 0; i < processClasses.get().size(); i++ ) { id_class[decodeProcessClassKey(processClasses.get()[i].key)] = decodeProcessClassValue(processClasses.get()[i].value); } std::vector results; for( int i = 0; i < processData.get().size(); i++ ) { ProcessData data = decodeWorkerListValue(processData.get()[i].value); ProcessClass processClass = id_class[data.locality.processId()]; if(processClass.classSource() == ProcessClass::DBSource || data.processClass.classType() == ProcessClass::UnsetClass) data.processClass = processClass; if(data.processClass.classType() != ProcessClass::TesterClass) results.push_back(data); } return results; } ACTOR Future> getWorkers( Database cx ) { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary? tr.setOption( FDBTransactionOptions::LOCK_AWARE ); vector workers = wait( getWorkers(&tr) ); return workers; } catch (Error& e) { Void _ = wait( tr.onError(e) ); } } } ACTOR Future> getCoordinators( Database cx ) { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::LOCK_AWARE ); Optional currentKey = wait( tr.get( coordinatorsKey ) ); if (!currentKey.present()) return std::vector(); return ClusterConnectionString( currentKey.get().toString() ).coordinators(); } catch (Error& e) { Void _ = wait( tr.onError(e) ); } } } ACTOR Future changeQuorum( Database cx, Reference change ) { state Transaction tr(cx); state int retries = 0; loop { try { tr.setOption( FDBTransactionOptions::LOCK_AWARE ); Optional currentKey = wait( tr.get( coordinatorsKey ) ); if (!currentKey.present()) return CoordinatorsResult::BAD_DATABASE_STATE; // Someone deleted this key entirely? state ClusterConnectionString old( currentKey.get().toString() ); if ( cx->cluster && old.clusterKeyName().toString() != cx->cluster->getConnectionFile()->getConnectionString().clusterKeyName() ) return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database?? state CoordinatorsResult::Type result = CoordinatorsResult::SUCCESS; std::vector _desiredCoordinators = wait( change->getDesiredCoordinators( &tr, old.coordinators(), Reference(new ClusterConnectionFile(old)), result ) ); std::vector desiredCoordinators = _desiredCoordinators; if (result != CoordinatorsResult::SUCCESS) return result; if (!desiredCoordinators.size()) return CoordinatorsResult::INVALID_NETWORK_ADDRESSES; std::sort(desiredCoordinators.begin(), desiredCoordinators.end()); std::string newName = change->getDesiredClusterKeyName(); if (newName.empty()) newName = old.clusterKeyName().toString(); if ( old.coordinators() == desiredCoordinators && old.clusterKeyName() == newName) return retries ? CoordinatorsResult::SUCCESS : CoordinatorsResult::SAME_NETWORK_ADDRESSES; state ClusterConnectionString conn( desiredCoordinators, StringRef( newName + ':' + g_random->randomAlphaNumeric( 32 ) ) ); if(g_network->isSimulated()) { for(int i = 0; i < (desiredCoordinators.size()/2)+1; i++) { auto address = NetworkAddress(desiredCoordinators[i].ip,desiredCoordinators[i].port,true,false); g_simulator.protectedAddresses.insert(address); TraceEvent("ProtectCoordinator").detail("Address", address).backtrace(); } } TraceEvent("AttemptingQuorumChange").detail("FromCS", old.toString()).detail("ToCS", conn.toString()); TEST(old.clusterKeyName() != conn.clusterKeyName()); // Quorum change with new name TEST(old.clusterKeyName() == conn.clusterKeyName()); // Quorum change with unchanged name vector>> leaderServers; ClientCoordinators coord( Reference( new ClusterConnectionFile( conn ) ) ); for( int i = 0; i < coord.clientLeaderServers.size(); i++ ) leaderServers.push_back( retryBrokenPromise( coord.clientLeaderServers[i].getLeader, GetLeaderRequest( coord.clusterKey, UID() ), TaskCoordinationReply ) ); choose { when( Void _ = wait( waitForAll( leaderServers ) ) ) {} when( Void _ = wait( delay(5.0) ) ) { return CoordinatorsResult::COORDINATOR_UNREACHABLE; } } tr.set( coordinatorsKey, conn.toString() ); Void _ = wait( tr.commit() ); ASSERT( false ); //commit should fail, but the value has changed } catch (Error& e) { TraceEvent("RetryQuorumChange").error(e).detail("Retries", retries); Void _ = wait( tr.onError(e) ); ++retries; } } } struct SpecifiedQuorumChange : IQuorumChange { vector desired; explicit SpecifiedQuorumChange( vector const& desired ) : desired(desired) {} virtual Future> getDesiredCoordinators( Transaction* tr, vector oldCoordinators, Reference, CoordinatorsResult::Type& ) { return desired; } }; Reference specifiedQuorumChange(vector const& addresses) { return Reference(new SpecifiedQuorumChange(addresses)); } struct NoQuorumChange : IQuorumChange { virtual Future> getDesiredCoordinators( Transaction* tr, vector oldCoordinators, Reference, CoordinatorsResult::Type& ) { return oldCoordinators; } }; Reference noQuorumChange() { return Reference(new NoQuorumChange); } struct NameQuorumChange : IQuorumChange { std::string newName; Reference otherChange; explicit NameQuorumChange( std::string const& newName, Reference const& otherChange ) : newName(newName), otherChange(otherChange) {} virtual Future> getDesiredCoordinators( Transaction* tr, vector oldCoordinators, Reference cf, CoordinatorsResult::Type& t ) { return otherChange->getDesiredCoordinators(tr, oldCoordinators, cf, t); } virtual std::string getDesiredClusterKeyName() { return newName; } }; Reference nameQuorumChange(std::string const& name, Reference const& other) { return Reference(new NameQuorumChange( name, other )); } struct AutoQuorumChange : IQuorumChange { int desired; explicit AutoQuorumChange( int desired ) : desired(desired) {} virtual Future> getDesiredCoordinators( Transaction* tr, vector oldCoordinators, Reference ccf, CoordinatorsResult::Type& err ) { return getDesired( this, tr, oldCoordinators, ccf, &err ); } ACTOR static Future getRedundancy( AutoQuorumChange* self, Transaction* tr ) { state Future> fStorageReplicas = tr->get( LiteralStringRef("storage_replicas").withPrefix( configKeysPrefix ) ); state Future> fLogReplicas = tr->get( LiteralStringRef("log_replicas").withPrefix( configKeysPrefix ) ); Void _ = wait( success( fStorageReplicas ) && success( fLogReplicas ) ); int redundancy = std::min( atoi( fStorageReplicas.get().get().toString().c_str() ), atoi( fLogReplicas.get().get().toString().c_str() ) ); return redundancy; } ACTOR static Future isAcceptable( AutoQuorumChange* self, Transaction* tr, vector oldCoordinators, Reference ccf, int desiredCount, std::set* excluded ) { // Are there enough coordinators for the redundancy level? if (oldCoordinators.size() < desiredCount) return false; if (oldCoordinators.size() % 2 != 1) return false; // Check availability ClientCoordinators coord(ccf); vector>> leaderServers; for( int i = 0; i < coord.clientLeaderServers.size(); i++ ) leaderServers.push_back( retryBrokenPromise( coord.clientLeaderServers[i].getLeader, GetLeaderRequest( coord.clusterKey, UID() ), TaskCoordinationReply ) ); Optional>> results = wait( timeout( getAll(leaderServers), CLIENT_KNOBS->IS_ACCEPTABLE_DELAY ) ); if (!results.present()) return false; // Not all responded for(auto& r : results.get()) if (!r.present()) return false; // Coordinator doesn't know about this database? // Check exclusions for(auto& c : oldCoordinators) { if (addressExcluded(*excluded, c)) return false; } // Check locality // FIXME: Actual locality! std::sort( oldCoordinators.begin(), oldCoordinators.end() ); for(int i=1; i> getDesired( AutoQuorumChange* self, Transaction* tr, vector oldCoordinators, Reference ccf, CoordinatorsResult::Type* err ) { state int desiredCount = self->desired; if(desiredCount == -1) { int redundancy = wait( getRedundancy( self, tr ) ); desiredCount = redundancy*2 - 1; } std::vector excl = wait( getExcludedServers( tr ) ); state std::set excluded( excl.begin(), excl.end() ); vector _workers = wait(getWorkers(tr)); state vector workers = _workers; std::map addr_locality; for(auto w : workers) addr_locality[w.address] = w.locality; // since we don't have the locality data for oldCoordinators: // check if every old coordinator is in the workers vector and // check if multiple old coordinators map to the same locality data (same machine) bool checkAcceptable = true; std::set>> checkDuplicates; if (workers.size()){ for (auto addr : oldCoordinators) { auto findResult = addr_locality.find(addr); if (findResult == addr_locality.end() || checkDuplicates.count(findResult->second.zoneId())){ checkAcceptable = false; break; } checkDuplicates.insert(findResult->second.zoneId()); } } if (checkAcceptable){ bool ok = wait(isAcceptable(self, tr, oldCoordinators, ccf, desiredCount, &excluded)); if (ok) return oldCoordinators; } std::vector chosen; self->addDesiredWorkers(chosen, workers, desiredCount, excluded); if (chosen.size() < desiredCount) { if (chosen.size() < oldCoordinators.size()) { TraceEvent("NotEnoughMachinesForCoordinators").detail("EligibleWorkers", workers.size()).detail("DesiredCoordinators", desiredCount).detail("CurrentCoordinators", oldCoordinators.size()); *err = CoordinatorsResult::NOT_ENOUGH_MACHINES; return vector(); } desiredCount = std::max(oldCoordinators.size(), (workers.size() - 1) | 1); chosen.resize(desiredCount); } return chosen; } void addDesiredWorkers(vector& chosen, const vector& workers, int desiredCount, const std::set& excluded) { vector remainingWorkers(workers); g_random->randomShuffle(remainingWorkers); std::map maxCounts; std::map> currentCounts; std::map hardLimits; vector fields({ LiteralStringRef("dcid"), LiteralStringRef("data_hall"), LiteralStringRef("zoneid"), LiteralStringRef("machineid") }); for(auto field = fields.begin(); field != fields.end(); field++) { if(field->toString() == "machineid") { hardLimits[*field] = 1; } else { hardLimits[*field] = desiredCount; } } while(chosen.size() < desiredCount) { bool found = false; for (auto worker = remainingWorkers.begin(); worker != remainingWorkers.end(); worker++) { if(addressExcluded(excluded, worker->address)) { continue; } bool valid = true; for(auto field = fields.begin(); field != fields.end(); field++) { if(maxCounts[*field] == 0) { maxCounts[*field] = 1; } auto value = worker->locality.get(*field).orDefault(LiteralStringRef("")); auto currentCount = currentCounts[*field][value]; if(currentCount >= maxCounts[*field]) { valid = false; break; } } if(valid) { for(auto field = fields.begin(); field != fields.end(); field++) { auto value = worker->locality.get(*field).orDefault(LiteralStringRef("")); currentCounts[*field][value] += 1; } chosen.push_back(worker->address); remainingWorkers.erase(worker); found = true; break; } } if(!found) { bool canIncrement = false; for(auto field = fields.begin(); field != fields.end(); field++) { if(maxCounts[*field] < hardLimits[*field]) { maxCounts[*field] += 1; canIncrement = true; break; } } if(!canIncrement) { break; } } } } }; Reference autoQuorumChange( int desired ) { return Reference(new AutoQuorumChange(desired)); } ACTOR Future excludeServers( Database cx, vector servers ) { state Transaction tr(cx); state std::string versionKey = g_random->randomUniqueID().toString(); loop { try { tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) ); //To conflict with parallel includeServers tr.set( excludedServersVersionKey, versionKey ); for(auto& s : servers) tr.set( encodeExcludedServersKey(s), StringRef() ); TraceEvent("ExcludeServersCommit").detail("Servers", describe(servers)); Void _ = wait( tr.commit() ); return Void(); } catch (Error& e) { Void _ = wait( tr.onError(e) ); } } } ACTOR Future includeServers( Database cx, vector servers ) { state bool includeAll = false; state Transaction tr(cx); state std::string versionKey = g_random->randomUniqueID().toString(); loop { try { tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); // includeServers might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY ); tr.addReadConflictRange( singleKeyRange(excludedServersVersionKey) ); tr.set( excludedServersVersionKey, versionKey ); for(auto& s : servers ) { if (!s.isValid()) { tr.clear( excludedServersKeys ); includeAll = true; } else if (s.isWholeMachine()) { // Eliminate both any ip-level exclusion (1.2.3.4) and any port-level exclusions (1.2.3.4:5) tr.clear( KeyRangeRef( encodeExcludedServersKey(s), encodeExcludedServersKey(s) + char(':'+1) ) ); } else { tr.clear( encodeExcludedServersKey(s) ); } } TraceEvent("IncludeServersCommit").detail("Servers", describe(servers)); Void _ = wait( tr.commit() ); return Void(); } catch (Error& e) { TraceEvent("IncludeServersError").error(e, true); Void _ = wait( tr.onError(e) ); } } } ACTOR Future setClass( Database cx, AddressExclusion server, ProcessClass processClass ) { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); tr.setOption( FDBTransactionOptions::LOCK_AWARE ); vector workers = wait( getWorkers(&tr) ); bool foundChange = false; for(int i = 0; i < workers.size(); i++) { if( server.excludes(workers[i].address) ) { if(processClass.classType() != ProcessClass::InvalidClass) tr.set(processClassKeyFor(workers[i].locality.processId().get()), processClassValue(processClass)); else tr.clear(processClassKeyFor(workers[i].locality.processId().get())); foundChange = true; } } if(foundChange) tr.set(processClassChangeKey, g_random->randomUniqueID().toString()); Void _ = wait( tr.commit() ); return Void(); } catch (Error& e) { Void _ = wait( tr.onError(e) ); } } } ACTOR static Future> getExcludedServers( Transaction* tr ) { Standalone r = wait( tr->getRange( excludedServersKeys, CLIENT_KNOBS->TOO_MANY ) ); ASSERT( !r.more && r.size() < CLIENT_KNOBS->TOO_MANY ); vector exclusions; for(auto i = r.begin(); i != r.end(); ++i) { auto a = decodeExcludedServersKey( i->key ); if (a.isValid()) exclusions.push_back( a ); } return exclusions; } ACTOR Future> getExcludedServers( Database cx ) { state Transaction tr(cx); loop { try { tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary? tr.setOption( FDBTransactionOptions::LOCK_AWARE ); vector exclusions = wait( getExcludedServers(&tr) ); return exclusions; } catch (Error& e) { Void _ = wait( tr.onError(e) ); } } } ACTOR Future setDDMode( Database cx, int mode ) { state Transaction tr(cx); state int oldMode = -1; state BinaryWriter wr(Unversioned()); wr << mode; loop { try { Optional old = wait( tr.get( dataDistributionModeKey ) ); if (oldMode < 0) { oldMode = 1; if (old.present()) { BinaryReader rd(old.get(), Unversioned()); rd >> oldMode; } } if (!mode) { BinaryWriter wrMyOwner(Unversioned()); wrMyOwner << dataDistributionModeLock; tr.set( moveKeysLockOwnerKey, wrMyOwner.toStringRef() ); } tr.set( dataDistributionModeKey, wr.toStringRef() ); Void _ = wait( tr.commit() ); return oldMode; } catch (Error& e) { TraceEvent("setDDModeRetrying").error(e); Void _ = wait (tr.onError(e)); } } } ACTOR Future waitForExcludedServers( Database cx, vector excl ) { state std::set exclusions( excl.begin(), excl.end() ); if (!excl.size()) return Void(); loop { state Transaction tr(cx); try { tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS ); tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary? tr.setOption( FDBTransactionOptions::LOCK_AWARE ); // Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed recovery // Check that there aren't any storage servers with addresses violating the exclusions Standalone serverList = wait( tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) ); ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY ); state bool ok = true; for(auto& s : serverList) { auto addr = decodeServerListValue( s.value ).address(); if ( addressExcluded(exclusions, addr) ) { ok = false; break; } } if (ok) { Optional> value = wait( tr.get(logsKey) ); ASSERT(value.present()); auto logs = decodeLogsValue(value.get()); for( auto const& log : logs.first ) { if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) { ok = false; break; } } for( auto const& log : logs.second ) { if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) { ok = false; break; } } } if (ok) return Void(); Void _ = wait( delayJittered( 1.0 ) ); // SOMEDAY: watches! } catch (Error& e) { Void _ = wait( tr.onError(e) ); } } } ACTOR Future timeKeeperSetDisable(Database cx) { loop { state Transaction tr(cx); try { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr.setOption(FDBTransactionOptions::LOCK_AWARE); tr.set(timeKeeperDisableKey, StringRef()); Void _ = wait(tr.commit()); return Void(); } catch (Error &e) { Void _ = wait(tr.onError(e)); } } } ACTOR Future lockDatabase( Transaction* tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if(val.present()) { if(BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) == id) { return Void(); } else { //TraceEvent("DBA_lock_locked").detail("expecting", id).detail("lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())); throw database_locked(); } } tr->atomicOp(databaseLockedKey, BinaryWriter::toValue(id, Unversioned()).withPrefix(LiteralStringRef("0123456789")), MutationRef::SetVersionstampedValue); tr->addWriteConflictRange(normalKeys); return Void(); } ACTOR Future lockDatabase( Reference tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if(val.present()) { if(BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) == id) { return Void(); } else { //TraceEvent("DBA_lock_locked").detail("expecting", id).detail("lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())); throw database_locked(); } } tr->atomicOp(databaseLockedKey, BinaryWriter::toValue(id, Unversioned()).withPrefix(LiteralStringRef("0123456789")), MutationRef::SetVersionstampedValue); tr->addWriteConflictRange(normalKeys); return Void(); } ACTOR Future lockDatabase( Database cx, UID id ) { state Transaction tr(cx); loop { try { Void _ = wait( lockDatabase(&tr, id) ); Void _ = wait( tr.commit() ); return Void(); } catch( Error &e ) { if(e.code() == error_code_database_locked) throw e; Void _ = wait( tr.onError(e) ); } } } ACTOR Future unlockDatabase( Transaction* tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if(!val.present()) return Void(); if(val.present() && BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) != id) { //TraceEvent("DBA_unlock_locked").detail("expecting", id).detail("lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())); throw database_locked(); } tr->clear(singleKeyRange(databaseLockedKey)); return Void(); } ACTOR Future unlockDatabase( Reference tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if(!val.present()) return Void(); if(val.present() && BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) != id) { //TraceEvent("DBA_unlock_locked").detail("expecting", id).detail("lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())); throw database_locked(); } tr->clear(singleKeyRange(databaseLockedKey)); return Void(); } ACTOR Future unlockDatabase( Database cx, UID id ) { state Transaction tr(cx); loop { try { Void _ = wait( unlockDatabase(&tr, id) ); Void _ = wait( tr.commit() ); return Void(); } catch( Error &e ) { if(e.code() == error_code_database_locked) throw e; Void _ = wait( tr.onError(e) ); } } } ACTOR Future checkDatabaseLock( Transaction* tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if (val.present() && BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) != id) { //TraceEvent("DBA_check_locked").detail("expecting", id).detail("lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())).backtrace(); throw database_locked(); } return Void(); } ACTOR Future checkDatabaseLock( Reference tr, UID id ) { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional val = wait( tr->get(databaseLockedKey) ); if (val.present() && BinaryReader::fromStringRef(val.get().substr(10), Unversioned()) != id) { //TraceEvent("DBA_check_locked").detail("expecting", id).detail("lock", BinaryReader::fromStringRef(val.get().substr(10), Unversioned())).backtrace(); throw database_locked(); } return Void(); } TEST_CASE("ManagementAPI/AutoQuorumChange/checkLocality") { Void _ = wait(Future(Void())); std::vector workers; std::vector chosen; std::set excluded; AutoQuorumChange change(5); for(int i = 0; i < 10; i++) { ProcessData data; auto dataCenter = std::to_string(i / 4 % 2); auto dataHall = dataCenter + std::to_string(i / 2 % 2); auto rack = dataHall + std::to_string(i % 2); auto machineId = rack + std::to_string(i); data.locality.set(LiteralStringRef("dcid"), StringRef(dataCenter)); data.locality.set(LiteralStringRef("data_hall"), StringRef(dataHall)); data.locality.set(LiteralStringRef("rack"), StringRef(rack)); data.locality.set(LiteralStringRef("zoneid"), StringRef(rack)); data.locality.set(LiteralStringRef("machineid"), StringRef(machineId)); data.address.ip = i; workers.push_back(data); } change.addDesiredWorkers(chosen, workers, 5, excluded); std::map> chosenValues; ASSERT(chosen.size() == 5); std::vector fields({ LiteralStringRef("dcid"), LiteralStringRef("data_hall"), LiteralStringRef("zoneid"), LiteralStringRef("machineid") }); for(auto worker = chosen.begin(); worker != chosen.end(); worker++) { ASSERT(worker->ip < workers.size()); LocalityData data = workers[worker->ip].locality; for(auto field = fields.begin(); field != fields.end(); field++) { chosenValues[*field].insert(data.get(*field).get()); } } ASSERT(chosenValues[LiteralStringRef("dcid")].size() == 2); ASSERT(chosenValues[LiteralStringRef("data_hall")].size() == 4); ASSERT(chosenValues[LiteralStringRef("zoneid")].size() == 5); ASSERT(chosenValues[LiteralStringRef("machineid")].size() == 5); return Void(); }