#!/usr/bin/env python3 import argparse import base64 from collections import namedtuple from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import date import http import io import json from pathlib import Path import subprocess import tempfile import time from uuid import uuid4 import zipfile import alibabacloud_credentials as credentials import alibabacloud_credentials.client import alibabacloud_credentials.models import alibabacloud_ecs20140526 as ecs import alibabacloud_ecs20140526.client import alibabacloud_ecs20140526.models import alibabacloud_fc20230330 as fc import alibabacloud_fc20230330.client import alibabacloud_fc20230330.models import alibabacloud_oss_v2 as oss import alibabacloud_ram20150501 as ram import alibabacloud_ram20150501.client import alibabacloud_ram20150501.models import alibabacloud_sts20150401 as sts import alibabacloud_sts20150401.client import alibabacloud_sts20150401.models import alibabacloud_tea_openapi as openapi import alibabacloud_tea_openapi.client import alibabacloud_tea_openapi.models import alibabacloud_tea_util as util import alibabacloud_tea_util.client import alibabacloud_tea_util.models ECS_ENDPOINT = 'ecs.aliyuncs.com' RAM_ENDPOINT = 'ram.aliyuncs.com' STS_ENDPOINT = 'sts.aliyuncs.com' FC_NODE_RUNTIME = 'nodejs20' FC_MAX_ATTEMPTS = 5 FC_CONNECT_TIMEOUT_MS = 10000 FC_READ_TIMEOUT_MS = 60000 FC_TIMEOUT_SEC = 60 FC_MEMORY_SIZE_MB = 128 OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket' OSS_BUCKET_NAME_LEN = 63 IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-' # For regions in mainland China, the Chinese state censorship laws # prohibit direct access to OSS bucket contents. # # We work around this restriction by creating a temporary Function # Compute function in each region to access OSS via the internal OSS # endpoints, which are not subject to these restrictions. Yes, this # is somewhat absurd. # IPXE_CENSORSHIP_BYPASS_FUNCTION = f''' const prefix = "{IPXE_STORAGE_PREFIX}"; ''' + ''' const assert = require("node:assert"); const OSS = require("ali-oss"); exports.handler = async (event, context) => { const payload = JSON.parse(event.toString()); console.log(JSON.stringify(payload)); const src = payload.source && new OSS({ region: "oss-" + payload.source.region, bucket: payload.source.bucket, accessKeyId: context.credentials.accessKeyId, accessKeySecret: context.credentials.accessKeySecret, stsToken: context.credentials.securityToken, }); const dst = new OSS({ region: "oss-" + context.region, internal: true, bucket: payload.bucket, accessKeyId: context.credentials.accessKeyId, accessKeySecret: context.credentials.accessKeySecret, stsToken: context.credentials.securityToken, }); const add = payload.keys || []; const del = ((await dst.listV2({prefix: prefix})).objects || []) .map(x => x.name).filter(x => ! add.includes(x)); assert(add.every(x => x.startsWith(prefix))); assert(del.every(x => x.startsWith(prefix))); if (add.length) console.log("Creating: " + add.sort().join(", ")); if (del.length) console.log("Deleting: " + del.sort().join(", ")); await Promise.all([ ...add.map(async (x) => dst.putStream(x, (await src.getStream(x)).stream)), ...(del.length ? [dst.deleteMulti(del)] : []), ]); }; ''' Clients = namedtuple('Clients', ['region', 'ecs', 'fc', 'oss']) Image = namedtuple('Image', ['path', 'family', 'name', 'arch', 'mode']) def image(filename, basefamily, basename): """Construct image description""" with tempfile.NamedTemporaryFile(mode='w+t') as mtoolsrc: mtoolsrc.writelines([ 'drive D:', f'file="{filename}"', 'drive P:', f'file="{filename}"', 'partition=4', ]) mtoolsrc.flush() mdir = subprocess.run(['mdir', '-b', 'D:/EFI/BOOT', 'P:/EFI/BOOT'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, env={'MTOOLSRC': mtoolsrc.name}) mapping = { b'BOOTX64.EFI': 'x86_64', b'BOOTAA64.EFI': 'arm64', } uefi = [v for k, v in mapping.items() if k in mdir.stdout] suffix = ('-uefi-%s' % uefi[0].replace('_', '-') if len(uefi) == 1 else '-uefi-multi' if uefi else '') path = Path(filename) family = '%s%s' % (basefamily, suffix) name = '%s%s' % (basename, suffix) arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64' mode = 'UEFI' if uefi else 'BIOS' return Image(path, family, name, arch, mode) def all_regions(): """Get list of all regions""" cred = credentials.client.Client() conf = openapi.models.Config(credential=cred, endpoint=ECS_ENDPOINT) client = ecs.client.Client(conf) req = ecs.models.DescribeRegionsRequest() rsp = client.describe_regions(req) regions = sorted(x.region_id for x in rsp.body.regions.region) return regions def account_id(): """Get account ID""" cred = credentials.client.Client() conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT) client = sts.client.Client(conf) rsp = client.get_caller_identity() return rsp.body.account_id def role_arn(name): """Get role resource name""" cred = credentials.client.Client() conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT) client = ram.client.Client(conf) req = ram.models.GetRoleRequest(role_name=name) rsp = client.get_role(req) return rsp.body.role.arn def all_clients(region, account): """Construct all per-region clients""" cred = credentials.client.Client() ecsconf = openapi.models.Config(credential=cred, region_id=region) fcep = '%s.%s.fc.aliyuncs.com' % (account, region) fcconf = openapi.models.Config(credential=cred, endpoint=fcep) osscred = oss.credentials.EnvironmentVariableCredentialsProvider() ossconf = oss.config.Config(credentials_provider=osscred, region=region) clients = Clients( region=region, ecs=ecs.client.Client(ecsconf), fc=fc.client.Client(fcconf), oss=oss.client.Client(ossconf), ) return clients def delete_temp_function(clients, func): """Remove temporary function""" assert func.startswith(IPXE_STORAGE_PREFIX) clients.fc.delete_function(func) def create_temp_function(clients, role): """Create temporary function (and remove any stale temporary functions)""" req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX) try: rsp = clients.fc.list_functions(req) except openapi.client.UnretryableException: # AliCloud provides no other way to detect non-functional regions return None funcs = [x.function_name for x in rsp.body.functions or ()] for func in funcs: delete_temp_function(clients, func) buf = io.BytesIO() with zipfile.ZipFile(buf, 'w') as zfh: zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION) zf = base64.b64encode(buf.getvalue()).decode() code = fc.models.InputCodeLocation(zip_file=zf) func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) body = fc.models.CreateFunctionInput( code=code, function_name=func, handler='index.handler', memory_size=FC_MEMORY_SIZE_MB, role=role, runtime=FC_NODE_RUNTIME, timeout=FC_TIMEOUT_SEC, ) req = fc.models.CreateFunctionRequest(body=body) rsp = clients.fc.create_function(req) return func def call_temp_function(clients, func, payload): """Call temporary function""" hdr = fc.models.InvokeFunctionHeaders( x_fc_invocation_type='Sync', x_fc_log_type='Tail', ) body = json.dumps(payload) req = fc.models.InvokeFunctionRequest(body=body) run = util.models.RuntimeOptions( autoretry=True, max_attempts=FC_MAX_ATTEMPTS, connect_timeout=FC_CONNECT_TIMEOUT_MS, read_timeout=FC_READ_TIMEOUT_MS, ) rsp = clients.fc.invoke_function_with_options(func, req, hdr, run) log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode() if rsp.status_code != http.HTTPStatus.OK: raise RuntimeError(rsp) if 'x-fc-error-type' in rsp.headers: raise RuntimeError(log) def delete_temp_bucket(clients, func, bucket): """Remove temporary bucket""" assert bucket.startswith(IPXE_STORAGE_PREFIX) payload = {'bucket': bucket} call_temp_function(clients, func, payload) req = oss.models.DeleteBucketRequest(bucket=bucket) clients.oss.delete_bucket(req) def create_temp_bucket(clients, func): """Create temporary bucket (and remove any stale temporary buckets)""" prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region) req = oss.models.ListBucketsRequest(prefix=prefix) rsp = clients.oss.list_buckets(req) buckets = [x.name for x in rsp.buckets or ()] for bucket in buckets: delete_temp_bucket(clients, func, bucket) bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN] req = oss.models.PutBucketRequest(bucket=bucket) try: rsp = clients.oss.put_bucket(req) except oss.exceptions.OperationError as exc: # AliCloud provides no other way to detect non-functional regions if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE: return None raise exc return bucket def upload_image(clients, bucket, image): """Upload disk image to uncensored bucket""" key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) req = oss.models.PutObjectRequest(bucket=bucket, key=key) rsp = clients.oss.put_object_from_file(req, image.path) return key def copy_images(clients, func, bucket, source, keys): """Copy disk images to bucket from uncensored bucket""" payload = { 'bucket': bucket, 'source': source, 'keys': keys, } call_temp_function(clients, func, payload) def delete_images(clients, name): """Remove existing images""" req = ecs.models.DescribeImagesRequest( region_id=clients.region, image_name=name, image_owner_alias='self', ) rsp = clients.ecs.describe_images(req) for image in rsp.body.images.image or (): req = ecs.models.DeleteImageRequest( region_id=clients.region, image_id=image.image_id ) rsp = clients.ecs.delete_image(req) def import_image(clients, image, bucket, key, public, overwrite): """Import image""" if overwrite: delete_images(clients, image.name) disk = ecs.models.ImportImageRequestDiskDeviceMapping( disk_image_size = 1, format = 'RAW', ossbucket = bucket, ossobject = key, ) req = ecs.models.ImportImageRequest( region_id=clients.region, image_name=image.name, architecture=image.arch, boot_mode=image.mode, disk_device_mapping=[disk], ) rsp = clients.ecs.import_image(req) image_id = rsp.body.image_id task_id = rsp.body.task_id while True: time.sleep(5) req = ecs.models.DescribeTasksRequest( region_id=clients.region, task_ids=task_id, ) rsp = clients.ecs.describe_tasks(req) status = rsp.body.task_set.task[0].task_status if status not in ('Waiting', 'Processing'): break if status != 'Finished': raise RuntimeError(status) req = ecs.models.ModifyImageAttributeRequest( region_id=clients.region, image_id=image_id, image_family=image.family, ) rsp = clients.ecs.modify_image_attribute(req) if public: req = ecs.models.ModifyImageSharePermissionRequest( region_id=clients.region, image_id=image_id, is_public=True, ) rsp = clients.ecs.modify_image_share_permission(req) return image_id # Parse command-line arguments parser = argparse.ArgumentParser(description="Import Alibaba Cloud image") parser.add_argument('--name', '-n', help="Base image name") parser.add_argument('--family', '-f', default='ipxe', help="Base family name") parser.add_argument('--public', '-p', action='store_true', help="Make image public") parser.add_argument('--overwrite', action='store_true', help="Overwrite any existing image with same name") parser.add_argument('--region', '-r', action='append', help="AliCloud region(s)") parser.add_argument('--fc-role', '-F', default="AliyunFcDefaultRole", help="AliCloud role for censorship bypass function") parser.add_argument('image', nargs='+', help="iPXE disk image") args = parser.parse_args() # Use default name if none specified if not args.name: args.name = '%s-%s' % (args.family, date.today().strftime('%Y%m%d')) # Construct image list images = [image(x, args.family, args.name) for x in args.image] # Use all regions if none specified if not args.region: args.region = all_regions() # Look up resource names fcrole = role_arn(args.fc_role) # Construct per-region clients account = account_id() clients = {region: all_clients(region, account) for region in args.region} # Create temporary functions in each region with ThreadPoolExecutor(max_workers=len(args.region)) as executor: futures = {executor.submit(create_temp_function, clients=clients[region], role=fcrole): region for region in args.region} funcs = {futures[x]: x.result() for x in as_completed(futures)} # Create temporary buckets in each region (requires function to exist) with ThreadPoolExecutor(max_workers=len(args.region)) as executor: futures = {executor.submit(create_temp_bucket, clients=clients[region], func=funcs[region]): region for region in args.region if funcs[region] is not None} buckets = {futures[x]: x.result() for x in as_completed(futures)} # Select an uncensored region with functioning object storage uncensored = next((k for k, v in buckets.items() if v is not None and not k.startswith('cn-')), None) if uncensored is None: parser.error("At least one available uncensored region is required") # Upload images directly to chosen uncensored region with ThreadPoolExecutor(max_workers=len(images)) as executor: futures = {executor.submit(upload_image, clients=clients[uncensored], bucket=buckets[uncensored], image=image): image for image in images} keys = {futures[x]: x.result() for x in as_completed(futures)} # Copy images to all other regions with ThreadPoolExecutor(max_workers=len(args.region)) as executor: source = {'region': uncensored, 'bucket': buckets[uncensored]} futures = {executor.submit(copy_images, clients=clients[region], func=funcs[region], bucket=buckets[region], source=source, keys=list(keys.values())): region for region in args.region if funcs[region] is not None and buckets[region] is not None and region != uncensored} done = {futures[x]: x.result() for x in as_completed(futures)} # Import all images imports = [(region, image) for region in args.region for image in images] with ThreadPoolExecutor(max_workers=len(imports)) as executor: futures = {executor.submit(import_image, clients=clients[region], image=image, bucket=buckets[region], key=keys[image], public=args.public, overwrite=args.overwrite): (region, image) for region, image in imports if funcs[region] is not None and buckets[region] is not None} results = {futures[x]: x.result() for x in as_completed(futures)} # Remove temporary buckets with ThreadPoolExecutor(max_workers=len(args.region)) as executor: futures = {executor.submit(delete_temp_bucket, clients=clients[region], func=funcs[region], bucket=buckets[region]): region for region in args.region if funcs[region] is not None and buckets[region] is not None} done = {futures[x]: x.result() for x in as_completed(futures)} # Remove temporary functions with ThreadPoolExecutor(max_workers=len(args.region)) as executor: futures = {executor.submit(delete_temp_function, clients=clients[region], func=funcs[region]): region for region in args.region if funcs[region] is not None} done = {futures[x]: x.result() for x in as_completed(futures)} # Show created images for region, image in imports: mark = "(*)" if region == uncensored else "" result = ("[no FC]" if funcs[region] is None else "[no OSS]" if buckets[region] is None else results[(region, image)]) print("%s%s %s (%s) %s" % (region, mark, image.name, image.family, result))