mirror of
https://github.com/apple/foundationdb.git
synced 2025-06-01 18:56:00 +08:00
Added BlobStoreEndpoint::listBuckets(), renamed listBucket() and several related functions with similar names to listObjects() to avoid confusion and closer match what it actually does. Added a bytesDeleted output statistic to BlobStoreEndpoint::deleteRecursively.
This commit is contained in:
parent
c75b7f63c9
commit
09e8d804e8
@ -1458,7 +1458,7 @@ public:
|
||||
return pathFilter(folderPath.substr(prefixTrim));
|
||||
};
|
||||
|
||||
state BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listBucket(bc->m_bucket, bc->dataPath(path), '/', std::numeric_limits<int>::max(), rawPathFilter));
|
||||
state BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listObjects(bc->m_bucket, bc->dataPath(path), '/', std::numeric_limits<int>::max(), rawPathFilter));
|
||||
FilesAndSizesT files;
|
||||
for(auto &o : result.objects) {
|
||||
ASSERT(o.name.size() >= prefixTrim);
|
||||
|
@ -320,11 +320,11 @@ Future<Void> BlobStoreEndpoint::deleteObject(std::string const &bucket, std::str
|
||||
return deleteObject_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, object);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteRecursively_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string prefix, int *pNumDeleted) {
|
||||
ACTOR Future<Void> deleteRecursively_impl(Reference<BlobStoreEndpoint> b, std::string bucket, std::string prefix, int *pNumDeleted, int64_t *pBytesDeleted) {
|
||||
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
|
||||
// Start a recursive parallel listing which will send results to resultStream as they are received
|
||||
state Future<Void> done = b->listBucketStream(bucket, resultStream, prefix, '/', std::numeric_limits<int>::max());
|
||||
// Wrap done in an actor which will send end_of_stream since listBucketStream() does not (so that many calls can write to the same stream)
|
||||
state Future<Void> done = b->listObjectsStream(bucket, resultStream, prefix, '/', std::numeric_limits<int>::max());
|
||||
// Wrap done in an actor which will send end_of_stream since listObjectsStream() does not (so that many calls can write to the same stream)
|
||||
done = map(done, [=](Void) {
|
||||
resultStream.sendError(end_of_stream());
|
||||
return Void();
|
||||
@ -341,10 +341,13 @@ ACTOR Future<Void> deleteRecursively_impl(Reference<BlobStoreEndpoint> b, std::s
|
||||
|
||||
when(BlobStoreEndpoint::ListResult list = waitNext(resultStream.getFuture())) {
|
||||
for(auto &object : list.objects) {
|
||||
int *pNumDeletedCopy = pNumDeleted; // avoid capture of this
|
||||
deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [pNumDeletedCopy](Void) -> Void {
|
||||
if(pNumDeletedCopy != nullptr)
|
||||
++*pNumDeletedCopy;
|
||||
deleteFutures.push_back(map(b->deleteObject(bucket, object.name), [=](Void) -> Void {
|
||||
if(pNumDeleted != nullptr) {
|
||||
++*pNumDeleted;
|
||||
}
|
||||
if(pBytesDeleted != nullptr) {
|
||||
*pBytesDeleted += object.size;
|
||||
}
|
||||
return Void();
|
||||
}));
|
||||
}
|
||||
@ -370,8 +373,8 @@ ACTOR Future<Void> deleteRecursively_impl(Reference<BlobStoreEndpoint> b, std::s
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> BlobStoreEndpoint::deleteRecursively(std::string const &bucket, std::string prefix, int *pNumDeleted) {
|
||||
return deleteRecursively_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, prefix, pNumDeleted);
|
||||
Future<Void> BlobStoreEndpoint::deleteRecursively(std::string const &bucket, std::string prefix, int *pNumDeleted, int64_t *pBytesDeleted) {
|
||||
return deleteRecursively_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, prefix, pNumDeleted, pBytesDeleted);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> createBucket_impl(Reference<BlobStoreEndpoint> b, std::string bucket) {
|
||||
@ -703,7 +706,7 @@ Future<Reference<HTTP::Response>> BlobStoreEndpoint::doRequest(std::string const
|
||||
return doRequest_impl(Reference<BlobStoreEndpoint>::addRef(this), verb, resource, headers, pContent, contentLen, successCodes);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, PromiseStream<BlobStoreEndpoint::ListResult> results, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
|
||||
ACTOR Future<Void> listObjectsStream_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, PromiseStream<BlobStoreEndpoint::ListResult> results, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
|
||||
// Request 1000 keys at a time, the maximum allowed
|
||||
state std::string resource = "/";
|
||||
resource.append(bucket);
|
||||
@ -782,7 +785,7 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
|
||||
if(maxDepth > 0) {
|
||||
// If there is no recurse filter or the filter returns true then start listing the subfolder
|
||||
if(!recurseFilter || recurseFilter(prefix)) {
|
||||
subLists.push_back(bstore->listBucketStream(bucket, results, prefix, delimiter, maxDepth - 1, recurseFilter));
|
||||
subLists.push_back(bstore->listObjectsStream(bucket, results, prefix, delimiter, maxDepth - 1, recurseFilter));
|
||||
}
|
||||
// Since prefix will not be in the final listResult below we have to set lastFile here in case it's greater than the last object
|
||||
lastFile = prefix;
|
||||
@ -829,14 +832,14 @@ ACTOR Future<Void> listBucketStream_impl(Reference<BlobStoreEndpoint> bstore, st
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> BlobStoreEndpoint::listBucketStream(std::string const &bucket, PromiseStream<ListResult> results, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
|
||||
return listBucketStream_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, results, prefix, delimiter, maxDepth, recurseFilter);
|
||||
Future<Void> BlobStoreEndpoint::listObjectsStream(std::string const &bucket, PromiseStream<ListResult> results, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
|
||||
return listObjectsStream_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, results, prefix, delimiter, maxDepth, recurseFilter);
|
||||
}
|
||||
|
||||
ACTOR Future<BlobStoreEndpoint::ListResult> listBucket_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
|
||||
ACTOR Future<BlobStoreEndpoint::ListResult> listObjects_impl(Reference<BlobStoreEndpoint> bstore, std::string bucket, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
|
||||
state BlobStoreEndpoint::ListResult results;
|
||||
state PromiseStream<BlobStoreEndpoint::ListResult> resultStream;
|
||||
state Future<Void> done = bstore->listBucketStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter);
|
||||
state Future<Void> done = bstore->listObjectsStream(bucket, resultStream, prefix, delimiter, maxDepth, recurseFilter);
|
||||
// Wrap done in an actor which sends end_of_stream because list does not so that many lists can write to the same stream
|
||||
done = map(done, [=](Void) {
|
||||
resultStream.sendError(end_of_stream());
|
||||
@ -865,8 +868,75 @@ ACTOR Future<BlobStoreEndpoint::ListResult> listBucket_impl(Reference<BlobStoreE
|
||||
return results;
|
||||
}
|
||||
|
||||
Future<BlobStoreEndpoint::ListResult> BlobStoreEndpoint::listBucket(std::string const &bucket, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
|
||||
return listBucket_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, prefix, delimiter, maxDepth, recurseFilter);
|
||||
Future<BlobStoreEndpoint::ListResult> BlobStoreEndpoint::listObjects(std::string const &bucket, Optional<std::string> prefix, Optional<char> delimiter, int maxDepth, std::function<bool(std::string const &)> recurseFilter) {
|
||||
return listObjects_impl(Reference<BlobStoreEndpoint>::addRef(this), bucket, prefix, delimiter, maxDepth, recurseFilter);
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<std::string>> listBuckets_impl(Reference<BlobStoreEndpoint> bstore) {
|
||||
state std::string resource = "/?marker=";
|
||||
state std::string lastName;
|
||||
state bool more = true;
|
||||
state std::vector<std::string> buckets;
|
||||
|
||||
while(more) {
|
||||
wait(bstore->concurrentLists.take());
|
||||
state FlowLock::Releaser listReleaser(bstore->concurrentLists, 1);
|
||||
|
||||
HTTP::Headers headers;
|
||||
state std::string fullResource = resource + HTTP::urlEncode(lastName);
|
||||
Reference<HTTP::Response> r = wait(bstore->doRequest("GET", fullResource, headers, NULL, 0, {200}));
|
||||
listReleaser.release();
|
||||
|
||||
try {
|
||||
xml_document<> doc;
|
||||
|
||||
// Copy content because rapidxml will modify it during parse
|
||||
std::string content = r->content;
|
||||
doc.parse<0>((char *)content.c_str());
|
||||
|
||||
// There should be exactly one node
|
||||
xml_node<> *result = doc.first_node();
|
||||
if(result == nullptr || strcmp(result->name(), "ListAllMyBucketsResult") != 0) {
|
||||
throw http_bad_response();
|
||||
}
|
||||
|
||||
more = false;
|
||||
xml_node<> *truncated = result->first_node("IsTruncated");
|
||||
if(truncated != nullptr && strcmp(truncated->value(), "true") == 0) {
|
||||
more = true;
|
||||
}
|
||||
|
||||
xml_node<> *bucketsNode = result->first_node("Buckets");
|
||||
if(bucketsNode != nullptr) {
|
||||
xml_node<> *bucketNode = bucketsNode->first_node("Bucket");
|
||||
while(bucketNode != nullptr) {
|
||||
xml_node<> *nameNode = bucketNode->first_node("Name");
|
||||
if(nameNode == nullptr) {
|
||||
throw http_bad_response();
|
||||
}
|
||||
const char *name = nameNode->value();
|
||||
buckets.push_back(name);
|
||||
|
||||
bucketNode = bucketNode->next_sibling("Bucket");
|
||||
}
|
||||
}
|
||||
|
||||
if(more) {
|
||||
lastName = buckets.back();
|
||||
}
|
||||
|
||||
} catch(Error &e) {
|
||||
if(e.code() != error_code_actor_cancelled)
|
||||
TraceEvent(SevWarn, "BlobStoreEndpointListBucketResultParseError").error(e).suppressFor(60).detail("Resource", fullResource);
|
||||
throw http_bad_response();
|
||||
}
|
||||
}
|
||||
|
||||
return buckets;
|
||||
}
|
||||
|
||||
Future<std::vector<std::string>> BlobStoreEndpoint::listBuckets() {
|
||||
return listBuckets_impl(Reference<BlobStoreEndpoint>::addRef(this));
|
||||
}
|
||||
|
||||
std::string BlobStoreEndpoint::hmac_sha1(std::string const &msg) {
|
||||
|
@ -193,10 +193,13 @@ public:
|
||||
// Get bucket contents via a stream, since listing large buckets will take many serial blob requests
|
||||
// If a delimiter is passed then common prefixes will be read in parallel, recursively, depending on recurseFilter.
|
||||
// Recursefilter is a must be a function that takes a string and returns true if it passes. The default behavior is to assume true.
|
||||
Future<Void> listBucketStream(std::string const &bucket, PromiseStream<ListResult> results, Optional<std::string> prefix = {}, Optional<char> delimiter = {}, int maxDepth = 0, std::function<bool(std::string const &)> recurseFilter = nullptr);
|
||||
Future<Void> listObjectsStream(std::string const &bucket, PromiseStream<ListResult> results, Optional<std::string> prefix = {}, Optional<char> delimiter = {}, int maxDepth = 0, std::function<bool(std::string const &)> recurseFilter = nullptr);
|
||||
|
||||
// Get a list of the files in a bucket, see listBucketStream for more argument detail.
|
||||
Future<ListResult> listBucket(std::string const &bucket, Optional<std::string> prefix = {}, Optional<char> delimiter = {}, int maxDepth = 0, std::function<bool(std::string const &)> recurseFilter = nullptr);
|
||||
// Get a list of the files in a bucket, see listObjectsStream for more argument detail.
|
||||
Future<ListResult> listObjects(std::string const &bucket, Optional<std::string> prefix = {}, Optional<char> delimiter = {}, int maxDepth = 0, std::function<bool(std::string const &)> recurseFilter = nullptr);
|
||||
|
||||
// Get a list of all buckets
|
||||
Future<std::vector<std::string>> listBuckets();
|
||||
|
||||
// Check if a bucket exists
|
||||
Future<bool> bucketExists(std::string const &bucket);
|
||||
@ -216,9 +219,9 @@ public:
|
||||
// Delete all objects in a bucket under a prefix. Note this is not atomic as blob store does not
|
||||
// support this operation directly. This method is just a convenience method that lists and deletes
|
||||
// all of the objects in the bucket under the given prefix.
|
||||
// Since it can take a while, if a pNumDeleted is provided then it will be incremented every time
|
||||
// Since it can take a while, if a pNumDeleted and/or pBytesDeleted are provided they will be incremented every time
|
||||
// a deletion of an object completes.
|
||||
Future<Void> deleteRecursively(std::string const &bucket, std::string prefix = "", int *pNumDeleted = NULL);
|
||||
Future<Void> deleteRecursively(std::string const &bucket, std::string prefix = "", int *pNumDeleted = nullptr, int64_t *pBytesDeleted = nullptr);
|
||||
|
||||
// Create a bucket if it does not already exists.
|
||||
Future<Void> createBucket(std::string const &bucket);
|
||||
|
Loading…
x
Reference in New Issue
Block a user