diff --git a/contrib/cloud/ali-import b/contrib/cloud/ali-import index d51a4e159..7b5df222a 100755 --- a/contrib/cloud/ali-import +++ b/contrib/cloud/ali-import @@ -1,16 +1,48 @@ #!/usr/bin/env python3 +# +# Doing anything in Alibaba Cloud is unnecessarily difficult and +# tedious due to a combination of poor and inconsistent API design, +# high API call failure rates, and Chinese state censorship laws. +# +# We resort to a mixture of strategies to get images imported to all +# regions: +# +# - For regions with working OSS that are not blocked by Chinese +# state censorship laws, upload the image files to an OSS bucket +# and then import the images. +# +# - For regions with working OSS that are blocked by Chinese state +# censorship laws but that have working FC, use a temporary FC +# function to copy the image files from the uncensored OSS buckets +# and then import the images. Attempt downloads from a variety of +# uncensored buckets, since cross-region OSS traffic tends to +# experience a failure rate of around 10% of requests. +# +# - For regions that have working OSS but are blocked by Chinese +# state censorship laws and do not have working FC, or for regions +# that don't even have working OSS, resort to using CopyImage to +# copy the previously imported images from another region. Spread +# the imports across as many source regions as possible to +# minimise the effect of the CopyImage rate limiting. import argparse +import base64 from collections import namedtuple from concurrent.futures import ThreadPoolExecutor, as_completed import datetime -from itertools import cycle +import http +import io +from itertools import cycle, groupby +import json import logging +from operator import itemgetter from pathlib import Path +import random import subprocess import tempfile import time from uuid import uuid4 +import zipfile import alibabacloud_credentials as credentials import alibabacloud_credentials.client @@ -18,7 +50,16 @@ import alibabacloud_credentials.models import alibabacloud_ecs20140526 as ecs import alibabacloud_ecs20140526.client import alibabacloud_ecs20140526.models +import alibabacloud_fc20230330 as fc +import alibabacloud_fc20230330.client +import alibabacloud_fc20230330.models import alibabacloud_oss_v2 as oss +import alibabacloud_ram20150501 as ram +import alibabacloud_ram20150501.client +import alibabacloud_ram20150501.models +import alibabacloud_sts20150401 as sts +import alibabacloud_sts20150401.client +import alibabacloud_sts20150401.models import alibabacloud_tea_openapi as openapi import alibabacloud_tea_openapi.client import alibabacloud_tea_openapi.models @@ -26,25 +67,91 @@ import alibabacloud_tea_util as util import alibabacloud_tea_util.client import alibabacloud_tea_util.models -# For regions in mainland China, the Chinese state censorship laws -# prohibit direct access to OSS bucket contents. -# -# We work around this restriction by creating a temporary ECS instance -# in each region to access OSS via the internal OSS endpoints, which -# are not subject to these restrictions. Yes, this is absurd. - logger = logging.getLogger('ali-import') ECS_ENDPOINT = 'ecs.aliyuncs.com' +RAM_ENDPOINT = 'ram.aliyuncs.com' +STS_ENDPOINT = 'sts.aliyuncs.com' +FC_NODE_RUNTIME = 'nodejs20' +FC_TIMEOUT_SEC = 120 +FC_MEMORY_SIZE_MB = 128 +FC_SOURCE_COUNT = 10 + +OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket' OSS_BUCKET_NAME_LEN = 63 IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-' -IPXE_STORAGE_TAG = 'ipxe-upload-temp' -Clients = namedtuple('Clients', ['region', 'ecs', 'oss']) +POLL_INTERVAL_SEC = 5 +POLL_MAX_RETRIES = 100 + +# For regions in mainland China, the Chinese state censorship laws +# prohibit direct access to OSS bucket contents. +# +# We work around this restriction by creating a temporary Function +# Compute function in each region to access OSS via the internal OSS +# endpoints, which are not subject to these restrictions. Yes, this +# is somewhat absurd. +# +IPXE_CENSORSHIP_BYPASS_FUNCTION = f''' +const prefix = "{IPXE_STORAGE_PREFIX}"; +''' + ''' +const assert = require("node:assert"); +const OSS = require("ali-oss"); +exports.handler = async (event, context) => { + const payload = JSON.parse(event.toString()); + console.log(JSON.stringify(payload)); + const sources = payload.sources || {}; + const dest = new OSS({ + region: "oss-" + context.region, + internal: true, + bucket: payload.bucket, + accessKeyId: context.credentials.accessKeyId, + accessKeySecret: context.credentials.accessKeySecret, + stsToken: context.credentials.securityToken, + }); + const current = ((await dest.listV2({prefix: prefix})).objects || []) + .map(x => x.name); + const wanted = Object.keys(sources); + const add = wanted.filter(x => ! current.includes(x)); + const del = current.filter(x => ! wanted.includes(x)); + assert(add.every(x => x.startsWith(prefix))); + assert(del.every(x => x.startsWith(prefix))); + if (add.length) + console.log("Creating: " + add.sort().join(", ")); + if (del.length) + console.log("Deleting: " + del.sort().join(", ")); + const copy = async (key) => { + for (const url of sources[key]) { + console.log("Downloading " + key + " from " + url); + try { + const download = await fetch(url, {signal: AbortSignal.timeout(15000)}); + if (! download.ok) + throw new Error(download.status); + const content = await download.arrayBuffer(); + console.log("Downloaded " + key); + console.log("Uploading " + key); + await dest.put(key, Buffer.from(content)); + console.log("Uploaded " + key); + return; + } catch (err) { + console.error("Download failed", err); + } + } + throw new Error("All downloads failed for " + key); + }; + await Promise.all([ + ...add.map(copy), + ...(del.length ? [dest.deleteMulti(del)] : []), + ]); + console.log("Finished"); +}; +''' + +Clients = namedtuple('Clients', ['region', 'censored', 'ecs', 'fc', 'oss']) Image = namedtuple('Image', - ['path', 'family', 'name', 'arch', 'mode', 'public']) + ['path', 'family', 'name', 'arch', 'mode', 'key', 'public']) def image(filename, basefamily, basename, public): """Construct image description""" @@ -69,7 +176,8 @@ def image(filename, basefamily, basename, public): name = '%s%s' % (basename, suffix) arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64' mode = 'UEFI' if uefi else 'BIOS' - return Image(path, family, name, arch, mode, public) + key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) + return Image(path, family, name, arch, mode, key, public) def all_regions(): """Get list of all regions""" @@ -81,63 +189,177 @@ def all_regions(): regions = sorted(x.region_id for x in rsp.body.regions.region) return regions -def all_clients(region): +def account_id(): + """Get account ID""" + cred = credentials.client.Client() + conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT) + client = sts.client.Client(conf) + rsp = client.get_caller_identity() + return rsp.body.account_id + +def role_arn(name): + """Get role resource name""" + cred = credentials.client.Client() + conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT) + client = ram.client.Client(conf) + req = ram.models.GetRoleRequest(role_name=name) + rsp = client.get_role(req) + return rsp.body.role.arn + +def all_clients(region, account): """Construct all per-region clients""" cred = credentials.client.Client() ecsconf = openapi.models.Config(credential=cred, region_id=region) + fcep = '%s.%s.fc.aliyuncs.com' % (account, region) + fcconf = openapi.models.Config(credential=cred, endpoint=fcep) osscred = oss.credentials.EnvironmentVariableCredentialsProvider() ossconf = oss.config.Config(credentials_provider=osscred, region=region) clients = Clients( region=region, + censored=region.startswith('cn-'), ecs=ecs.client.Client(ecsconf), + fc=fc.client.Client(fcconf), oss=oss.client.Client(ossconf), ) return clients -def delete_temp_bucket(clients, bucket): - """Remove temporary bucket""" - logger.info("%s: deleting %s" % (clients.region, bucket)) - assert bucket.startswith(IPXE_STORAGE_PREFIX) - req = oss.models.ListObjectsV2Request( - bucket=bucket, - prefix=IPXE_STORAGE_PREFIX, +def delete_temp_function(clients, func): + """Remove temporary function""" + logger.info("delete function %s %s" % (clients.region, func)) + assert func.startswith(IPXE_STORAGE_PREFIX) + clients.fc.delete_function(func) + +def create_temp_function(clients, role): + """Create temporary function (and remove any stale temporary functions)""" + req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX) + try: + rsp = clients.fc.list_functions(req) + except openapi.client.UnretryableException: + # AliCloud provides no other way to detect non-working regions + return None + funcs = [x.function_name for x in rsp.body.functions or ()] + for func in funcs: + delete_temp_function(clients, func) + if not clients.censored: + # Functions are not required in uncensored regions + return None + buf = io.BytesIO() + with zipfile.ZipFile(buf, 'w') as zfh: + zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION) + zf = base64.b64encode(buf.getvalue()).decode() + code = fc.models.InputCodeLocation(zip_file=zf) + func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) + body = fc.models.CreateFunctionInput( + code=code, + function_name=func, + handler='index.handler', + memory_size=FC_MEMORY_SIZE_MB, + role=role, + runtime=FC_NODE_RUNTIME, + timeout=FC_TIMEOUT_SEC, ) - rsp = clients.oss.list_objects_v2(req) - delete = [x.key for x in rsp.contents or ()] - if delete: - req = oss.models.DeleteMultipleObjectsRequest( + req = fc.models.CreateFunctionRequest(body=body) + rsp = clients.fc.create_function(req) + logger.info("create function %s %s" % (clients.region, func)) + return func + +def call_temp_function(clients, func, payload): + """Call temporary function""" + hdr = fc.models.InvokeFunctionHeaders( + x_fc_invocation_type='Sync', + x_fc_log_type='Tail', + ) + body = json.dumps(payload) + req = fc.models.InvokeFunctionRequest(body=body) + run = util.models.RuntimeOptions( + autoretry=True, + max_attempts=5, + connect_timeout=10000, + read_timeout=120000, + ) + rsp = clients.fc.invoke_function_with_options(func, req, hdr, run) + log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode() + if rsp.status_code != http.HTTPStatus.OK: + raise RuntimeError(rsp) + if 'x-fc-error-type' in rsp.headers: + raise RuntimeError(log) + +def delete_temp_bucket(clients, func, bucket): + """Remove temporary bucket""" + logger.info("delete bucket %s %s" % (clients.region, bucket)) + assert bucket.startswith(IPXE_STORAGE_PREFIX) + # Delete bucket contents + if not clients.censored: + # Uncensored region: use OSS API calls to delete contents + req = oss.models.ListObjectsV2Request( bucket=bucket, - objects=[oss.models.DeleteObject(x) for x in delete], + prefix=IPXE_STORAGE_PREFIX, ) - rsp = clients.oss.delete_multiple_objects(req) + rsp = clients.oss.list_objects_v2(req) + delete = [x.key for x in rsp.contents or ()] + if delete: + req = oss.models.DeleteMultipleObjectsRequest( + bucket=bucket, + objects=[oss.models.DeleteObject(x) for x in delete], + ) + rsp = clients.oss.delete_multiple_objects(req) + elif func: + # Censored region with FC: use function to delete contents + payload = {'bucket': bucket} + call_temp_function(clients, func, payload) + else: + # Censored region without FC: assume bucket must be empty, + # since we could not have uploaded to it in the first place + pass + # Delete the now-empty bucket req = oss.models.DeleteBucketRequest(bucket=bucket) rsp = clients.oss.delete_bucket(req) -def create_temp_bucket(clients): +def create_temp_bucket(clients, func): """Create temporary bucket (and remove any stale temporary buckets)""" - if clients.region.startswith('cn-'): - # Object storage is non-functional in Chinese mainland regions - # due to censorship restrictions - return None prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region) req = oss.models.ListBucketsRequest(prefix=prefix) rsp = clients.oss.list_buckets(req) buckets = [x.name for x in rsp.buckets or ()] for bucket in buckets: - delete_temp_bucket(clients, bucket) + delete_temp_bucket(clients, func, bucket) + if clients.censored and not func: + # We cannot use OSS in censored regions with no Function Compute + return None bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN] req = oss.models.PutBucketRequest(bucket=bucket) - rsp = clients.oss.put_bucket(req) - logger.info("%s: created %s" % (clients.region, bucket)) + try: + rsp = clients.oss.put_bucket(req) + except oss.exceptions.OperationError as exc: + # AliCloud provides no other way to detect non-working regions + if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE: + return None + raise exc + logger.info("create bucket %s %s" % (clients.region, bucket)) return bucket -def upload_image(clients, bucket, image): - """Upload disk image to uncensored bucket""" - logger.info("%s: uploading %s" % (clients.region, image.name)) - key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) - req = oss.models.PutObjectRequest(bucket=bucket, key=key) +def upload_object(clients, bucket, image): + """Upload disk image object to uncensored bucket""" + logger.info("upload %s %s" % (clients.region, image.name)) + req = oss.models.PutObjectRequest(bucket=bucket, key=image.key) rsp = clients.oss.put_object_from_file(req, image.path) - return key + req = oss.models.GetObjectRequest(bucket=bucket, key=image.key) + rsp = clients.oss.presign(req) + return rsp.url + +def copy_objects(clients, bucket, func, uploads): + """Copy disk image objects to censored bucket from uncensored bucket""" + logger.info("upload %s (censored)" % clients.region) + payload = { + 'bucket': bucket, + 'sources': { + key: random.choices([url for key, url in urls], k=FC_SOURCE_COUNT) + for key, urls in groupby(sorted( + ((image.key, url) for (region, image), url in uploads.items()) + ), key=itemgetter(0)) + } + } + call_temp_function(clients, func, payload) def delete_image(clients, name): """Remove existing image (if applicable)""" @@ -148,7 +370,7 @@ def delete_image(clients, name): ) rsp = clients.ecs.describe_images(req) for image in rsp.body.images.image or (): - logger.info("%s: deleting %s (%s)" % + logger.info("delete image %s %s (%s)" % (clients.region, image.image_name, image.image_id)) if image.is_public: req = ecs.models.ModifyImageSharePermissionRequest( @@ -165,8 +387,9 @@ def delete_image(clients, name): def wait_for_task(clients, task_id): """Wait for task to complete""" - while True: - time.sleep(5) + status = 'Unknowable' + for i in range(POLL_MAX_RETRIES): + time.sleep(POLL_INTERVAL_SEC) req = ecs.models.DescribeTasksRequest( region_id=clients.region, task_ids=task_id, @@ -182,8 +405,9 @@ def wait_for_task(clients, task_id): def wait_for_image(clients, image_id): """Wait for image to become available""" - while True: - time.sleep(5) + status = 'Unknowable' + for i in range(POLL_MAX_RETRIES): + time.sleep(POLL_INTERVAL_SEC) req = ecs.models.DescribeImagesRequest( region_id=clients.region, image_id=image_id, @@ -198,14 +422,14 @@ def wait_for_image(clients, image_id): if status != 'Available': raise RuntimeError(status) -def import_image(clients, image, bucket, key): +def import_image(clients, image, bucket): """Import image""" - logger.info("%s: importing %s" % (clients.region, image.name)) + logger.info("import %s %s" % (clients.region, image.name)) disk = ecs.models.ImportImageRequestDiskDeviceMapping( disk_image_size = 1, format = 'RAW', ossbucket = bucket, - ossobject = key, + ossobject = image.key, ) req = ecs.models.ImportImageRequest( region_id=clients.region, @@ -219,14 +443,14 @@ def import_image(clients, image, bucket, key): task_id = rsp.body.task_id wait_for_task(clients, task_id) wait_for_image(clients, image_id) - logger.info("%s: imported %s (%s)" % + logger.info("image %s %s (%s)" % (clients.region, image.name, image_id)) return image_id def copy_image(clients, image, image_id, censored): """Copy imported image to censored region""" - logger.info("%s: copying %s (%s) to %s" % - (clients.region, image.name, image_id, censored.region)) + logger.info("import %s %s via %s" % + (censored.region, image.name, clients.region)) req = ecs.models.CopyImageRequest( region_id=clients.region, image_id=image_id, @@ -236,14 +460,12 @@ def copy_image(clients, image, image_id, censored): rsp = clients.ecs.copy_image(req) copy_id = rsp.body.image_id wait_for_image(censored, copy_id) - logger.info("%s: copied %s (%s) to %s" % - (clients.region, image.name, copy_id, censored.region)) + logger.info("image %s %s (%s)" % (censored.region, image.name, copy_id)) return copy_id def finalise_image(clients, image, image_id): """Finalise image attributes and permissions""" - logger.info("%s: finalising %s (%s)" % - (clients.region, image.name, image_id)) + logger.info("finalise %s %s (%s)" % (clients.region, image.name, image_id)) req = ecs.models.ModifyImageAttributeRequest( region_id=clients.region, image_id=image_id, @@ -271,6 +493,8 @@ parser.add_argument('--overwrite', action='store_true', help="Overwrite any existing image with same name") parser.add_argument('--region', '-r', action='append', help="AliCloud region(s)") +parser.add_argument('--role', '-R', default="AliyunFcDefaultRole", + help="AliCloud role for censorship bypass function") parser.add_argument('image', nargs='+', help="iPXE disk image") args = parser.parse_args() @@ -278,6 +502,7 @@ args = parser.parse_args() loglevels = [logging.WARNING, logging.INFO, logging.DEBUG] verbosity = min(args.verbose, (len(loglevels) - 1)) logging.basicConfig(level=loglevels[verbosity]) +logging.getLogger('apscheduler').setLevel(logging.WARNING) # Use default name if none specified if not args.name: @@ -290,59 +515,82 @@ regions = args.region or all_regions() # Construct image list images = [image(x, args.family, args.name, args.public) for x in args.image] imports = [(region, image) for region in regions for image in images] +workers = len(imports) + +# Look up resource names +fcrole = role_arn(args.role) # Construct per-region clients -clients = {region: all_clients(region) for region in regions} +account = account_id() +clients = {region: all_clients(region, account) for region in regions} # Delete existing images from all regions, if applicable if args.overwrite: - with ThreadPoolExecutor(max_workers=len(imports)) as executor: + with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(delete_image, clients=clients[region], name=image.name): (region, image) for region, image in imports} done = {futures[x]: x.result() for x in as_completed(futures)} -# Create temporary buckets in all uncensored regions -with ThreadPoolExecutor(max_workers=len(regions)) as executor: +# Create temporary function in each censored region with usable FC +with ThreadPoolExecutor(max_workers=workers) as executor: + futures = {executor.submit(create_temp_function, + clients=clients[region], + role=fcrole): region + for region in regions} + funcs = {futures[x]: x.result() for x in as_completed(futures)} + +# Create temporary bucket in each region with usable OSS +with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(create_temp_bucket, - clients=clients[region]): region + clients=clients[region], + func=funcs[region]): region for region in regions} buckets = {futures[x]: x.result() for x in as_completed(futures)} -if not any(buckets.values()): - parser.error("At least one non-Chinese region is required") -# Upload images directly to uncensored regions -with ThreadPoolExecutor(max_workers=len(imports)) as executor: - futures = {executor.submit(upload_image, +# Upload image objects directly to each uncensored region with usable OSS +with ThreadPoolExecutor(max_workers=workers) as executor: + futures = {executor.submit(upload_object, clients=clients[region], bucket=buckets[region], image=image): (region, image) - for region, image in imports if buckets[region]} - keys = {futures[x]: x.result() for x in as_completed(futures)} + for region, image in imports + if buckets[region] and not funcs[region]} + uploads = {futures[x]: x.result() for x in as_completed(futures)} +if not uploads: + parser.error("At least one working non-Chinese region is required") -# Import images to uncensored regions -with ThreadPoolExecutor(max_workers=len(imports)) as executor: +# Copy image objects to each censored region with usable OSS and usable FC +with ThreadPoolExecutor(max_workers=workers) as executor: + futures = {executor.submit(copy_objects, + clients=clients[region], + bucket=buckets[region], + func=funcs[region], + uploads=uploads): region + for region in regions + if buckets[region] and funcs[region]} + done = {futures[x]: x.result() for x in as_completed(futures)} + +# Import images in each region with usable OSS +with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(import_image, clients=clients[region], image=image, - bucket=buckets[region], - key=keys[(region, image)]): (region, image) + bucket=buckets[region]): (region, image) for region, image in imports if buckets[region]} results = {futures[x]: x.result() for x in as_completed(futures)} -# Select source uncensored region for each copy +# Copy images to regions without usable OSS # # Copies are rate-limited by source region, so spread the copies -# across all available uncensored regions. +# across all available regions with imported images. # copies = [(region, censored, image) for region, (censored, image) in zip( cycle(region for region in regions if buckets[region]), ((region, image) for region, image in imports if not buckets[region]), )] - -# Copy images to censored regions -with ThreadPoolExecutor(max_workers=len(imports)) as executor: +with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(copy_image, clients=clients[region], censored=clients[censored], @@ -350,10 +598,11 @@ with ThreadPoolExecutor(max_workers=len(imports)) as executor: image_id=results[(region, image)]): (censored, image) for region, censored, image in copies} - results.update({futures[x]: x.result() for x in as_completed(futures)}) + copied = {futures[x]: x.result() for x in as_completed(futures)} + results.update(copied) # Finalise images -with ThreadPoolExecutor(max_workers=len(imports)) as executor: +with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(finalise_image, clients=clients[region], image=image, @@ -363,13 +612,22 @@ with ThreadPoolExecutor(max_workers=len(imports)) as executor: done = {futures[x]: x.result() for x in as_completed(futures)} # Remove temporary buckets -with ThreadPoolExecutor(max_workers=len(regions)) as executor: +with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(delete_temp_bucket, clients=clients[region], + func=funcs[region], bucket=buckets[region]): region for region in regions if buckets[region]} done = {futures[x]: x.result() for x in as_completed(futures)} +# Remove temporary functions +with ThreadPoolExecutor(max_workers=workers) as executor: + futures = {executor.submit(delete_temp_function, + clients=clients[region], + func=funcs[region]): region + for region in regions if funcs[region]} + done = {futures[x]: x.result() for x in as_completed(futures)} + # Show created images for region, image in imports: image_id = results[(region, image)]