diff --git a/contrib/cloud/ali-import b/contrib/cloud/ali-import index 1792208a3..30d2ba763 100755 --- a/contrib/cloud/ali-import +++ b/contrib/cloud/ali-import @@ -4,16 +4,13 @@ import argparse import base64 from collections import namedtuple from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import date -import http -import io -import json +import datetime +import logging from pathlib import Path import subprocess import tempfile import time from uuid import uuid4 -import zipfile import alibabacloud_credentials as credentials import alibabacloud_credentials.client @@ -21,87 +18,39 @@ import alibabacloud_credentials.models import alibabacloud_ecs20140526 as ecs import alibabacloud_ecs20140526.client import alibabacloud_ecs20140526.models -import alibabacloud_fc20230330 as fc -import alibabacloud_fc20230330.client -import alibabacloud_fc20230330.models import alibabacloud_oss_v2 as oss -import alibabacloud_ram20150501 as ram -import alibabacloud_ram20150501.client -import alibabacloud_ram20150501.models -import alibabacloud_sts20150401 as sts -import alibabacloud_sts20150401.client -import alibabacloud_sts20150401.models import alibabacloud_tea_openapi as openapi import alibabacloud_tea_openapi.client import alibabacloud_tea_openapi.models import alibabacloud_tea_util as util import alibabacloud_tea_util.client import alibabacloud_tea_util.models +import alibabacloud_vpc20160428 as vpc +import alibabacloud_vpc20160428.client +import alibabacloud_vpc20160428.models + +# For regions in mainland China, the Chinese state censorship laws +# prohibit direct access to OSS bucket contents. +# +# We work around this restriction by creating a temporary ECS instance +# in each region to access OSS via the internal OSS endpoints, which +# are not subject to these restrictions. Yes, this is absurd. + +logger = logging.getLogger('ali-import') ECS_ENDPOINT = 'ecs.aliyuncs.com' -RAM_ENDPOINT = 'ram.aliyuncs.com' -STS_ENDPOINT = 'sts.aliyuncs.com' - -FC_NODE_RUNTIME = 'nodejs20' -FC_MAX_ATTEMPTS = 5 -FC_CONNECT_TIMEOUT_MS = 10000 -FC_READ_TIMEOUT_MS = 60000 -FC_TIMEOUT_SEC = 60 -FC_MEMORY_SIZE_MB = 128 OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket' OSS_BUCKET_NAME_LEN = 63 IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-' +IPXE_STORAGE_TAG = 'ipxe-upload-temp' +IPXE_STORAGE_DISK_CATEGORY = 'cloud_essd' -# For regions in mainland China, the Chinese state censorship laws -# prohibit direct access to OSS bucket contents. -# -# We work around this restriction by creating a temporary Function -# Compute function in each region to access OSS via the internal OSS -# endpoints, which are not subject to these restrictions. Yes, this -# is somewhat absurd. -# -IPXE_CENSORSHIP_BYPASS_FUNCTION = f''' -const prefix = "{IPXE_STORAGE_PREFIX}"; -''' + ''' -const assert = require("node:assert"); -const OSS = require("ali-oss"); -exports.handler = async (event, context) => { - const payload = JSON.parse(event.toString()); - console.log(JSON.stringify(payload)); - const src = payload.source && new OSS({ - region: "oss-" + payload.source.region, - bucket: payload.source.bucket, - accessKeyId: context.credentials.accessKeyId, - accessKeySecret: context.credentials.accessKeySecret, - stsToken: context.credentials.securityToken, - }); - const dst = new OSS({ - region: "oss-" + context.region, - internal: true, - bucket: payload.bucket, - accessKeyId: context.credentials.accessKeyId, - accessKeySecret: context.credentials.accessKeySecret, - stsToken: context.credentials.securityToken, - }); - const add = payload.keys || []; - const del = ((await dst.listV2({prefix: prefix})).objects || []) - .map(x => x.name).filter(x => ! add.includes(x)); - assert(add.every(x => x.startsWith(prefix))); - assert(del.every(x => x.startsWith(prefix))); - if (add.length) - console.log("Creating: " + add.sort().join(", ")); - if (del.length) - console.log("Deleting: " + del.sort().join(", ")); - await Promise.all([ - ...add.map(async (x) => dst.putStream(x, (await src.getStream(x)).stream)), - ...(del.length ? [dst.deleteMulti(del)] : []), - ]); -}; -''' +IPXE_SG_TAG = 'ipxe-default-sg' +IPXE_VSWITCH_TAG = 'ipxe-default-vswitch' -Clients = namedtuple('Clients', ['region', 'ecs', 'fc', 'oss']) +Clients = namedtuple('Clients', ['region', 'ecs', 'oss', 'vpc']) Image = namedtuple('Image', ['path', 'family', 'name', 'arch', 'mode']) def image(filename, basefamily, basename): @@ -139,111 +88,179 @@ def all_regions(): regions = sorted(x.region_id for x in rsp.body.regions.region) return regions -def account_id(): - """Get account ID""" - cred = credentials.client.Client() - conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT) - client = sts.client.Client(conf) - rsp = client.get_caller_identity() - return rsp.body.account_id - -def role_arn(name): - """Get role resource name""" - cred = credentials.client.Client() - conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT) - client = ram.client.Client(conf) - req = ram.models.GetRoleRequest(role_name=name) - rsp = client.get_role(req) - return rsp.body.role.arn - -def all_clients(region, account): +def all_clients(region): """Construct all per-region clients""" cred = credentials.client.Client() ecsconf = openapi.models.Config(credential=cred, region_id=region) - fcep = '%s.%s.fc.aliyuncs.com' % (account, region) - fcconf = openapi.models.Config(credential=cred, endpoint=fcep) osscred = oss.credentials.EnvironmentVariableCredentialsProvider() ossconf = oss.config.Config(credentials_provider=osscred, region=region) + vpcconf = openapi.models.Config(credential=cred, region_id=region) clients = Clients( region=region, ecs=ecs.client.Client(ecsconf), - fc=fc.client.Client(fcconf), oss=oss.client.Client(ossconf), + vpc=vpc.client.Client(vpcconf), ) return clients -def delete_temp_function(clients, func): - """Remove temporary function""" - assert func.startswith(IPXE_STORAGE_PREFIX) - clients.fc.delete_function(func) +def delete_temp_instance(clients, instance, retry=False): + """Remove temporary instance""" + logger.info("%s: deleting %s" % (clients.region, instance)) + while True: + req = ecs.models.DeleteInstanceRequest( + instance_id=instance, + force=True, + force_stop=True, + ) + try: + rsp = clients.ecs.delete_instance(req) + except openapi.exceptions.ClientException: + # Very recently created instances often cannot be + # terminated until some undocumented part of the control + # plane decides that enough time has elapsed + if retry: + time.sleep(1) + continue + raise + break -def create_temp_function(clients, role): - """Create temporary function (and remove any stale temporary functions)""" - req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX) - try: - rsp = clients.fc.list_functions(req) - except openapi.client.UnretryableException: - # AliCloud provides no other way to detect non-functional regions +def run_temp_instance_command(clients, instance, command): + """Run command on temporary instance""" + command_content=' '.join(command) + logger.info("%s: running %s" % (clients.region, command_content)) + req = ecs.models.RunCommandRequest( + region_id=clients.region, + instance_id=[instance], + type='RunShellScript', + command_content=command_content, + ) + rsp = clients.ecs.run_command(req) + invocation = rsp.body.invoke_id + while True: + time.sleep(1) + req = ecs.models.DescribeInvocationResultsRequest( + region_id=clients.region, + invoke_id=invocation, + ) + rsp = clients.ecs.describe_invocation_results(req) + result = rsp.body.invocation.invocation_results.invocation_result[0] + if result.invoke_record_status not in ('Pending', 'Running'): + break + output = base64.b64decode(result.output).decode() + if result.invocation_status != 'Success': + raise RuntimeError(output if output else result.invocation_status) + return result + +def create_temp_instance(clients, family, machine, role): + """Create temporary instance (and remove any stale temporary instances)""" + tag = ecs.models.DescribeInstancesRequestTag( + key=IPXE_STORAGE_TAG, + value=IPXE_STORAGE_TAG, + ) + req = ecs.models.DescribeInstancesRequest( + region_id=clients.region, + tag=[tag], + ) + rsp = clients.ecs.describe_instances(req) + for instance in rsp.body.instances.instance or []: + delete_temp_instance(clients, instance.instance_id) + req = ecs.models.DescribeAvailableResourceRequest( + region_id=clients.region, + destination_resource='Zone', + instance_type=machine, + ) + rsp = clients.ecs.describe_available_resource(req) + if rsp.body.available_zones is None: + # Cannot create instances in this region + logger.warning("%s: no zones support %s" % (clients.region, machine)) return None - funcs = [x.function_name for x in rsp.body.functions or ()] - for func in funcs: - delete_temp_function(clients, func) - buf = io.BytesIO() - with zipfile.ZipFile(buf, 'w') as zfh: - zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION) - zf = base64.b64encode(buf.getvalue()).decode() - code = fc.models.InputCodeLocation(zip_file=zf) - func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) - body = fc.models.CreateFunctionInput( - code=code, - function_name=func, - handler='index.handler', - memory_size=FC_MEMORY_SIZE_MB, - role=role, - runtime=FC_NODE_RUNTIME, - timeout=FC_TIMEOUT_SEC, + zone_id = next(x.zone_id + for x in rsp.body.available_zones.available_zone or [] + if x.status == 'Available') + logger.info("%s: creating %s in %s" % (clients.region, machine, zone_id)) + tag = ecs.models.DescribeSecurityGroupsRequestTag( + key=IPXE_SG_TAG, + value=IPXE_SG_TAG, ) - req = fc.models.CreateFunctionRequest(body=body) - rsp = clients.fc.create_function(req) - return func + req = ecs.models.DescribeSecurityGroupsRequest( + region_id=clients.region, + tag=[tag], + ) + rsp = clients.ecs.describe_security_groups(req) + sgs = rsp.body.security_groups.security_group or [] + sg_id = sgs[0].security_group_id + vpc_id = sgs[0].vpc_id + tag = vpc.models.DescribeVSwitchesRequestTag( + key=IPXE_VSWITCH_TAG, + value=IPXE_VSWITCH_TAG, + ) + req = vpc.models.DescribeVSwitchesRequest( + region_id=clients.region, + vpc_id=vpc_id, + zone_id=zone_id, + tag=[tag], + ) + rsp = clients.vpc.describe_vswitches(req) + vswitches = rsp.body.v_switches.v_switch or [] + vswitch_id = vswitches[0].v_switch_id + name = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) + sysdisk = ecs.models.RunInstancesRequestSystemDisk( + category=IPXE_STORAGE_DISK_CATEGORY, + ) + now = datetime.datetime.now(datetime.UTC) + lifetime = datetime.timedelta(hours=1) + release = (now + lifetime).strftime('%Y-%m-%dT%H:%M:%SZ') + tag = ecs.models.RunInstancesRequestTag( + key=IPXE_STORAGE_TAG, + value=IPXE_STORAGE_TAG, + ) + req = ecs.models.RunInstancesRequest( + region_id=clients.region, + image_family=family, + instance_type=machine, + instance_name=name, + auto_release_time=release, + ram_role_name=role, + system_disk=sysdisk, + security_group_ids=[sg_id], + v_switch_id=vswitch_id, + internet_charge_type='PayByTraffic', + internet_max_bandwidth_out=100, + tag=[tag], + ) + try: + rsp = clients.ecs.run_instances(req) + except openapi.exceptions.ClientException as exc: + if exc.code in ('RegionUnauthorized', + 'InvalidPeriod.RegionDiscontinued', + 'InvalidInstanceType.ValueNotSupported'): + logger.warning("%s: ECS lied about availability" % clients.region) + return None + raise + instance_id = rsp.body.instance_id_sets.instance_id_set[0] + logger.info("%s: created %s" % (clients.region, instance_id)) + command = ['aliyun', 'configure', 'set', '--mode', 'EcsRamRole', + '--region', clients.region] + run_temp_instance_command(clients, instance_id, command) + return instance_id -def call_temp_function(clients, func, payload): - """Call temporary function""" - hdr = fc.models.InvokeFunctionHeaders( - x_fc_invocation_type='Sync', - x_fc_log_type='Tail', - ) - body = json.dumps(payload) - req = fc.models.InvokeFunctionRequest(body=body) - run = util.models.RuntimeOptions( - autoretry=True, - max_attempts=FC_MAX_ATTEMPTS, - connect_timeout=FC_CONNECT_TIMEOUT_MS, - read_timeout=FC_READ_TIMEOUT_MS, - ) - rsp = clients.fc.invoke_function_with_options(func, req, hdr, run) - log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode() - if rsp.status_code != http.HTTPStatus.OK: - raise RuntimeError(rsp) - if 'x-fc-error-type' in rsp.headers: - raise RuntimeError(log) - -def delete_temp_bucket(clients, func, bucket): +def delete_temp_bucket(clients, instance, bucket): """Remove temporary bucket""" + logger.info("%s: deleting %s" % (clients.region, bucket)) assert bucket.startswith(IPXE_STORAGE_PREFIX) - payload = {'bucket': bucket} - call_temp_function(clients, func, payload) - req = oss.models.DeleteBucketRequest(bucket=bucket) - clients.oss.delete_bucket(req) + command = ['aliyun', 'oss', 'rm', 'oss://%s' % bucket, + '--bucket', '--recursive', '--force', + '--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)] + run_temp_instance_command(clients, instance, command) -def create_temp_bucket(clients, func): +def create_temp_bucket(clients, instance): """Create temporary bucket (and remove any stale temporary buckets)""" prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region) req = oss.models.ListBucketsRequest(prefix=prefix) rsp = clients.oss.list_buckets(req) buckets = [x.name for x in rsp.buckets or ()] for bucket in buckets: - delete_temp_bucket(clients, func, bucket) + delete_temp_bucket(clients, instance, bucket) bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN] req = oss.models.PutBucketRequest(bucket=bucket) try: @@ -251,25 +268,31 @@ def create_temp_bucket(clients, func): except oss.exceptions.OperationError as exc: # AliCloud provides no other way to detect non-functional regions if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE: + logger.warning("%s: non-functional OSS" % clients.region) return None raise exc + logger.info("%s: created %s" % (clients.region, bucket)) return bucket def upload_image(clients, bucket, image): """Upload disk image to uncensored bucket""" + logger.info("%s: uploading %s" % (clients.region, image.name)) key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) req = oss.models.PutObjectRequest(bucket=bucket, key=key) rsp = clients.oss.put_object_from_file(req, image.path) return key -def copy_images(clients, func, bucket, source, keys): +def copy_images(clients, instance, bucket, source): """Copy disk images to bucket from uncensored bucket""" - payload = { - 'bucket': bucket, - 'source': source, - 'keys': keys, - } - call_temp_function(clients, func, payload) + logger.info("%s: syncing from %s" % (clients.region, source['bucket'])) + command = ['aliyun', 'oss', 'sync', + 'oss://%s' % source['bucket'], 'syncdir', + '--endpoint', ('oss-%s.aliyuncs.com' % source['region'])] + run_temp_instance_command(clients, instance, command) + logger.info("%s: syncing to %s" % (clients.region, bucket)) + command = ['aliyun', 'oss', 'sync', 'syncdir', 'oss://%s' % bucket, + '--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)] + run_temp_instance_command(clients, instance, command) def delete_images(clients, name): """Remove existing images""" @@ -280,6 +303,15 @@ def delete_images(clients, name): ) rsp = clients.ecs.describe_images(req) for image in rsp.body.images.image or (): + logger.info("%s: deleting %s (%s)" % + (clients.region, image.image_name, image.image_id)) + if image.is_public: + req = ecs.models.ModifyImageSharePermissionRequest( + region_id=clients.region, + image_id=image.image_id, + is_public=False, + ) + rsp = clients.ecs.modify_image_share_permission(req) req = ecs.models.DeleteImageRequest( region_id=clients.region, image_id=image.image_id @@ -290,6 +322,7 @@ def import_image(clients, image, bucket, key, public, overwrite): """Import image""" if overwrite: delete_images(clients, image.name) + logger.info("%s: importing %s" % (clients.region, image.name)) disk = ecs.models.ImportImageRequestDiskDeviceMapping( disk_image_size = 1, format = 'RAW', @@ -331,10 +364,13 @@ def import_image(clients, image, bucket, key, public, overwrite): is_public=True, ) rsp = clients.ecs.modify_image_share_permission(req) + logger.info("%s: imported %s (%s)" % + (clients.region, image.name, image_id)) return image_id # Parse command-line arguments parser = argparse.ArgumentParser(description="Import Alibaba Cloud image") +parser.add_argument('--verbose', '-v', action='count', default=0) parser.add_argument('--name', '-n', help="Base image name") parser.add_argument('--family', '-f', default='ipxe', @@ -345,14 +381,25 @@ parser.add_argument('--overwrite', action='store_true', help="Overwrite any existing image with same name") parser.add_argument('--region', '-r', action='append', help="AliCloud region(s)") -parser.add_argument('--fc-role', '-F', default="AliyunFcDefaultRole", - help="AliCloud role for censorship bypass function") +parser.add_argument('--role', '-R', default="iPXECensorshipBypassRole", + help="AliCloud OSS censorship bypass role") +parser.add_argument('--helper-family', + default="acs:alibaba_cloud_linux_4_lts_x64", + help="Helper OS image family") +parser.add_argument('--helper-machine', default="ecs.e-c1m1.large", + help="Helper machine type") parser.add_argument('image', nargs='+', help="iPXE disk image") args = parser.parse_args() +# Configure logging +loglevels = [logging.WARNING, logging.INFO, logging.DEBUG] +verbosity = min(args.verbose, (len(loglevels) - 1)) +logging.basicConfig(level=loglevels[verbosity]) + # Use default name if none specified if not args.name: - args.name = '%s-%s' % (args.family, date.today().strftime('%Y%m%d')) + args.name = '%s-%s' % (args.family, + datetime.date.today().strftime('%Y%m%d')) # Construct image list images = [image(x, args.family, args.name) for x in args.image] @@ -361,28 +408,26 @@ images = [image(x, args.family, args.name) for x in args.image] if not args.region: args.region = all_regions() -# Look up resource names -fcrole = role_arn(args.fc_role) - # Construct per-region clients -account = account_id() -clients = {region: all_clients(region, account) for region in args.region} +clients = {region: all_clients(region) for region in args.region} -# Create temporary functions in each region +# Create temporary instances in each region with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - futures = {executor.submit(create_temp_function, + futures = {executor.submit(create_temp_instance, clients=clients[region], - role=fcrole): region + family=args.helper_family, + machine=args.helper_machine, + role=args.role): region for region in args.region} - funcs = {futures[x]: x.result() for x in as_completed(futures)} + instances = {futures[x]: x.result() for x in as_completed(futures)} -# Create temporary buckets in each region (requires function to exist) +# Create temporary buckets in each region (requires instance to exist) with ThreadPoolExecutor(max_workers=len(args.region)) as executor: futures = {executor.submit(create_temp_bucket, clients=clients[region], - func=funcs[region]): region + instance=instances[region]): region for region in args.region - if funcs[region] is not None} + if instances[region] is not None} buckets = {futures[x]: x.result() for x in as_completed(futures)} # Select an uncensored region with functioning object storage @@ -405,12 +450,11 @@ with ThreadPoolExecutor(max_workers=len(args.region)) as executor: source = {'region': uncensored, 'bucket': buckets[uncensored]} futures = {executor.submit(copy_images, clients=clients[region], - func=funcs[region], + instance=instances[region], bucket=buckets[region], - source=source, - keys=list(keys.values())): region + source=source): region for region in args.region - if funcs[region] is not None and buckets[region] is not None + if instances[region] is not None and buckets[region] is not None and region != uncensored} done = {futures[x]: x.result() for x in as_completed(futures)} @@ -425,32 +469,33 @@ with ThreadPoolExecutor(max_workers=len(imports)) as executor: public=args.public, overwrite=args.overwrite): (region, image) for region, image in imports - if funcs[region] is not None and buckets[region] is not None} + if instances[region] is not None and buckets[region] is not None} results = {futures[x]: x.result() for x in as_completed(futures)} # Remove temporary buckets with ThreadPoolExecutor(max_workers=len(args.region)) as executor: futures = {executor.submit(delete_temp_bucket, clients=clients[region], - func=funcs[region], + instance=instances[region], bucket=buckets[region]): region for region in args.region - if funcs[region] is not None and buckets[region] is not None} + if instances[region] is not None and buckets[region] is not None} done = {futures[x]: x.result() for x in as_completed(futures)} -# Remove temporary functions +# Remove temporary instances with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - futures = {executor.submit(delete_temp_function, + futures = {executor.submit(delete_temp_instance, clients=clients[region], - func=funcs[region]): region + instance=instances[region], + retry=True): region for region in args.region - if funcs[region] is not None} + if instances[region] is not None} done = {futures[x]: x.result() for x in as_completed(futures)} # Show created images for region, image in imports: mark = "(*)" if region == uncensored else "" - result = ("[no FC]" if funcs[region] is None else + result = ("[no ECS]" if instances[region] is None else "[no OSS]" if buckets[region] is None else results[(region, image)]) print("%s%s %s (%s) %s" % (region, mark, image.name, image.family, result))