[cloud] Do not rely on CopyImage to import images to Alibaba Cloud

The CopyImage API call does work, but is unacceptably slow due to rate
limiting.  Importing a full set of images to all regions can take
several hours (and is likely to fail at some point due to transient
errors in making API calls).

Resort to a mixture of strategies to get images imported to all
regions:

  - For regions with working OSS that are not blocked by Chinese state
    censorship laws, upload the image files to an OSS bucket and then
    import the images.

  - For regions with working OSS that are blocked by Chinese state
    censorship laws but that have working FC, use a temporary FC
    function to copy the image files from the uncensored OSS buckets
    and then import the images.  Attempt downloads from a variety of
    uncensored buckets, since cross-region OSS traffic tends to
    experience a failure rate of around 10% of requests.

  - For regions that have working OSS but are blocked by Chinese state
    censorship laws and do not have working FC, or for regions that
    don't even have working OSS, resort to using CopyImage to copy the
    previously imported images from another region.  Spread the
    imports across as many source regions as possible to minimise the
    effect of the CopyImage rate limiting.

Signed-off-by: Michael Brown <mcb30@ipxe.org>
This commit is contained in:
Michael Brown
2026-04-21 16:06:40 +01:00
parent 7e54e75a2f
commit d146b28b50
+337 -79
View File
@@ -1,16 +1,48 @@
#!/usr/bin/env python3
#
# Doing anything in Alibaba Cloud is unnecessarily difficult and
# tedious due to a combination of poor and inconsistent API design,
# high API call failure rates, and Chinese state censorship laws.
#
# We resort to a mixture of strategies to get images imported to all
# regions:
#
# - For regions with working OSS that are not blocked by Chinese
# state censorship laws, upload the image files to an OSS bucket
# and then import the images.
#
# - For regions with working OSS that are blocked by Chinese state
# censorship laws but that have working FC, use a temporary FC
# function to copy the image files from the uncensored OSS buckets
# and then import the images. Attempt downloads from a variety of
# uncensored buckets, since cross-region OSS traffic tends to
# experience a failure rate of around 10% of requests.
#
# - For regions that have working OSS but are blocked by Chinese
# state censorship laws and do not have working FC, or for regions
# that don't even have working OSS, resort to using CopyImage to
# copy the previously imported images from another region. Spread
# the imports across as many source regions as possible to
# minimise the effect of the CopyImage rate limiting.
import argparse
import base64
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor, as_completed
import datetime
from itertools import cycle
import http
import io
from itertools import cycle, groupby
import json
import logging
from operator import itemgetter
from pathlib import Path
import random
import subprocess
import tempfile
import time
from uuid import uuid4
import zipfile
import alibabacloud_credentials as credentials
import alibabacloud_credentials.client
@@ -18,7 +50,16 @@ import alibabacloud_credentials.models
import alibabacloud_ecs20140526 as ecs
import alibabacloud_ecs20140526.client
import alibabacloud_ecs20140526.models
import alibabacloud_fc20230330 as fc
import alibabacloud_fc20230330.client
import alibabacloud_fc20230330.models
import alibabacloud_oss_v2 as oss
import alibabacloud_ram20150501 as ram
import alibabacloud_ram20150501.client
import alibabacloud_ram20150501.models
import alibabacloud_sts20150401 as sts
import alibabacloud_sts20150401.client
import alibabacloud_sts20150401.models
import alibabacloud_tea_openapi as openapi
import alibabacloud_tea_openapi.client
import alibabacloud_tea_openapi.models
@@ -26,25 +67,91 @@ import alibabacloud_tea_util as util
import alibabacloud_tea_util.client
import alibabacloud_tea_util.models
# For regions in mainland China, the Chinese state censorship laws
# prohibit direct access to OSS bucket contents.
#
# We work around this restriction by creating a temporary ECS instance
# in each region to access OSS via the internal OSS endpoints, which
# are not subject to these restrictions. Yes, this is absurd.
logger = logging.getLogger('ali-import')
ECS_ENDPOINT = 'ecs.aliyuncs.com'
RAM_ENDPOINT = 'ram.aliyuncs.com'
STS_ENDPOINT = 'sts.aliyuncs.com'
FC_NODE_RUNTIME = 'nodejs20'
FC_TIMEOUT_SEC = 120
FC_MEMORY_SIZE_MB = 128
FC_SOURCE_COUNT = 10
OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket'
OSS_BUCKET_NAME_LEN = 63
IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-'
IPXE_STORAGE_TAG = 'ipxe-upload-temp'
Clients = namedtuple('Clients', ['region', 'ecs', 'oss'])
POLL_INTERVAL_SEC = 5
POLL_MAX_RETRIES = 100
# For regions in mainland China, the Chinese state censorship laws
# prohibit direct access to OSS bucket contents.
#
# We work around this restriction by creating a temporary Function
# Compute function in each region to access OSS via the internal OSS
# endpoints, which are not subject to these restrictions. Yes, this
# is somewhat absurd.
#
IPXE_CENSORSHIP_BYPASS_FUNCTION = f'''
const prefix = "{IPXE_STORAGE_PREFIX}";
''' + '''
const assert = require("node:assert");
const OSS = require("ali-oss");
exports.handler = async (event, context) => {
const payload = JSON.parse(event.toString());
console.log(JSON.stringify(payload));
const sources = payload.sources || {};
const dest = new OSS({
region: "oss-" + context.region,
internal: true,
bucket: payload.bucket,
accessKeyId: context.credentials.accessKeyId,
accessKeySecret: context.credentials.accessKeySecret,
stsToken: context.credentials.securityToken,
});
const current = ((await dest.listV2({prefix: prefix})).objects || [])
.map(x => x.name);
const wanted = Object.keys(sources);
const add = wanted.filter(x => ! current.includes(x));
const del = current.filter(x => ! wanted.includes(x));
assert(add.every(x => x.startsWith(prefix)));
assert(del.every(x => x.startsWith(prefix)));
if (add.length)
console.log("Creating: " + add.sort().join(", "));
if (del.length)
console.log("Deleting: " + del.sort().join(", "));
const copy = async (key) => {
for (const url of sources[key]) {
console.log("Downloading " + key + " from " + url);
try {
const download = await fetch(url, {signal: AbortSignal.timeout(15000)});
if (! download.ok)
throw new Error(download.status);
const content = await download.arrayBuffer();
console.log("Downloaded " + key);
console.log("Uploading " + key);
await dest.put(key, Buffer.from(content));
console.log("Uploaded " + key);
return;
} catch (err) {
console.error("Download failed", err);
}
}
throw new Error("All downloads failed for " + key);
};
await Promise.all([
...add.map(copy),
...(del.length ? [dest.deleteMulti(del)] : []),
]);
console.log("Finished");
};
'''
Clients = namedtuple('Clients', ['region', 'censored', 'ecs', 'fc', 'oss'])
Image = namedtuple('Image',
['path', 'family', 'name', 'arch', 'mode', 'public'])
['path', 'family', 'name', 'arch', 'mode', 'key', 'public'])
def image(filename, basefamily, basename, public):
"""Construct image description"""
@@ -69,7 +176,8 @@ def image(filename, basefamily, basename, public):
name = '%s%s' % (basename, suffix)
arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64'
mode = 'UEFI' if uefi else 'BIOS'
return Image(path, family, name, arch, mode, public)
key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
return Image(path, family, name, arch, mode, key, public)
def all_regions():
"""Get list of all regions"""
@@ -81,63 +189,177 @@ def all_regions():
regions = sorted(x.region_id for x in rsp.body.regions.region)
return regions
def all_clients(region):
def account_id():
"""Get account ID"""
cred = credentials.client.Client()
conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT)
client = sts.client.Client(conf)
rsp = client.get_caller_identity()
return rsp.body.account_id
def role_arn(name):
"""Get role resource name"""
cred = credentials.client.Client()
conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT)
client = ram.client.Client(conf)
req = ram.models.GetRoleRequest(role_name=name)
rsp = client.get_role(req)
return rsp.body.role.arn
def all_clients(region, account):
"""Construct all per-region clients"""
cred = credentials.client.Client()
ecsconf = openapi.models.Config(credential=cred, region_id=region)
fcep = '%s.%s.fc.aliyuncs.com' % (account, region)
fcconf = openapi.models.Config(credential=cred, endpoint=fcep)
osscred = oss.credentials.EnvironmentVariableCredentialsProvider()
ossconf = oss.config.Config(credentials_provider=osscred, region=region)
clients = Clients(
region=region,
censored=region.startswith('cn-'),
ecs=ecs.client.Client(ecsconf),
fc=fc.client.Client(fcconf),
oss=oss.client.Client(ossconf),
)
return clients
def delete_temp_bucket(clients, bucket):
"""Remove temporary bucket"""
logger.info("%s: deleting %s" % (clients.region, bucket))
assert bucket.startswith(IPXE_STORAGE_PREFIX)
req = oss.models.ListObjectsV2Request(
bucket=bucket,
prefix=IPXE_STORAGE_PREFIX,
def delete_temp_function(clients, func):
"""Remove temporary function"""
logger.info("delete function %s %s" % (clients.region, func))
assert func.startswith(IPXE_STORAGE_PREFIX)
clients.fc.delete_function(func)
def create_temp_function(clients, role):
"""Create temporary function (and remove any stale temporary functions)"""
req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX)
try:
rsp = clients.fc.list_functions(req)
except openapi.client.UnretryableException:
# AliCloud provides no other way to detect non-working regions
return None
funcs = [x.function_name for x in rsp.body.functions or ()]
for func in funcs:
delete_temp_function(clients, func)
if not clients.censored:
# Functions are not required in uncensored regions
return None
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zfh:
zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION)
zf = base64.b64encode(buf.getvalue()).decode()
code = fc.models.InputCodeLocation(zip_file=zf)
func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
body = fc.models.CreateFunctionInput(
code=code,
function_name=func,
handler='index.handler',
memory_size=FC_MEMORY_SIZE_MB,
role=role,
runtime=FC_NODE_RUNTIME,
timeout=FC_TIMEOUT_SEC,
)
rsp = clients.oss.list_objects_v2(req)
delete = [x.key for x in rsp.contents or ()]
if delete:
req = oss.models.DeleteMultipleObjectsRequest(
req = fc.models.CreateFunctionRequest(body=body)
rsp = clients.fc.create_function(req)
logger.info("create function %s %s" % (clients.region, func))
return func
def call_temp_function(clients, func, payload):
"""Call temporary function"""
hdr = fc.models.InvokeFunctionHeaders(
x_fc_invocation_type='Sync',
x_fc_log_type='Tail',
)
body = json.dumps(payload)
req = fc.models.InvokeFunctionRequest(body=body)
run = util.models.RuntimeOptions(
autoretry=True,
max_attempts=5,
connect_timeout=10000,
read_timeout=120000,
)
rsp = clients.fc.invoke_function_with_options(func, req, hdr, run)
log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode()
if rsp.status_code != http.HTTPStatus.OK:
raise RuntimeError(rsp)
if 'x-fc-error-type' in rsp.headers:
raise RuntimeError(log)
def delete_temp_bucket(clients, func, bucket):
"""Remove temporary bucket"""
logger.info("delete bucket %s %s" % (clients.region, bucket))
assert bucket.startswith(IPXE_STORAGE_PREFIX)
# Delete bucket contents
if not clients.censored:
# Uncensored region: use OSS API calls to delete contents
req = oss.models.ListObjectsV2Request(
bucket=bucket,
objects=[oss.models.DeleteObject(x) for x in delete],
prefix=IPXE_STORAGE_PREFIX,
)
rsp = clients.oss.delete_multiple_objects(req)
rsp = clients.oss.list_objects_v2(req)
delete = [x.key for x in rsp.contents or ()]
if delete:
req = oss.models.DeleteMultipleObjectsRequest(
bucket=bucket,
objects=[oss.models.DeleteObject(x) for x in delete],
)
rsp = clients.oss.delete_multiple_objects(req)
elif func:
# Censored region with FC: use function to delete contents
payload = {'bucket': bucket}
call_temp_function(clients, func, payload)
else:
# Censored region without FC: assume bucket must be empty,
# since we could not have uploaded to it in the first place
pass
# Delete the now-empty bucket
req = oss.models.DeleteBucketRequest(bucket=bucket)
rsp = clients.oss.delete_bucket(req)
def create_temp_bucket(clients):
def create_temp_bucket(clients, func):
"""Create temporary bucket (and remove any stale temporary buckets)"""
if clients.region.startswith('cn-'):
# Object storage is non-functional in Chinese mainland regions
# due to censorship restrictions
return None
prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region)
req = oss.models.ListBucketsRequest(prefix=prefix)
rsp = clients.oss.list_buckets(req)
buckets = [x.name for x in rsp.buckets or ()]
for bucket in buckets:
delete_temp_bucket(clients, bucket)
delete_temp_bucket(clients, func, bucket)
if clients.censored and not func:
# We cannot use OSS in censored regions with no Function Compute
return None
bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN]
req = oss.models.PutBucketRequest(bucket=bucket)
rsp = clients.oss.put_bucket(req)
logger.info("%s: created %s" % (clients.region, bucket))
try:
rsp = clients.oss.put_bucket(req)
except oss.exceptions.OperationError as exc:
# AliCloud provides no other way to detect non-working regions
if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE:
return None
raise exc
logger.info("create bucket %s %s" % (clients.region, bucket))
return bucket
def upload_image(clients, bucket, image):
"""Upload disk image to uncensored bucket"""
logger.info("%s: uploading %s" % (clients.region, image.name))
key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
req = oss.models.PutObjectRequest(bucket=bucket, key=key)
def upload_object(clients, bucket, image):
"""Upload disk image object to uncensored bucket"""
logger.info("upload %s %s" % (clients.region, image.name))
req = oss.models.PutObjectRequest(bucket=bucket, key=image.key)
rsp = clients.oss.put_object_from_file(req, image.path)
return key
req = oss.models.GetObjectRequest(bucket=bucket, key=image.key)
rsp = clients.oss.presign(req)
return rsp.url
def copy_objects(clients, bucket, func, uploads):
"""Copy disk image objects to censored bucket from uncensored bucket"""
logger.info("upload %s (censored)" % clients.region)
payload = {
'bucket': bucket,
'sources': {
key: random.choices([url for key, url in urls], k=FC_SOURCE_COUNT)
for key, urls in groupby(sorted(
((image.key, url) for (region, image), url in uploads.items())
), key=itemgetter(0))
}
}
call_temp_function(clients, func, payload)
def delete_image(clients, name):
"""Remove existing image (if applicable)"""
@@ -148,7 +370,7 @@ def delete_image(clients, name):
)
rsp = clients.ecs.describe_images(req)
for image in rsp.body.images.image or ():
logger.info("%s: deleting %s (%s)" %
logger.info("delete image %s %s (%s)" %
(clients.region, image.image_name, image.image_id))
if image.is_public:
req = ecs.models.ModifyImageSharePermissionRequest(
@@ -165,8 +387,9 @@ def delete_image(clients, name):
def wait_for_task(clients, task_id):
"""Wait for task to complete"""
while True:
time.sleep(5)
status = 'Unknowable'
for i in range(POLL_MAX_RETRIES):
time.sleep(POLL_INTERVAL_SEC)
req = ecs.models.DescribeTasksRequest(
region_id=clients.region,
task_ids=task_id,
@@ -182,8 +405,9 @@ def wait_for_task(clients, task_id):
def wait_for_image(clients, image_id):
"""Wait for image to become available"""
while True:
time.sleep(5)
status = 'Unknowable'
for i in range(POLL_MAX_RETRIES):
time.sleep(POLL_INTERVAL_SEC)
req = ecs.models.DescribeImagesRequest(
region_id=clients.region,
image_id=image_id,
@@ -198,14 +422,14 @@ def wait_for_image(clients, image_id):
if status != 'Available':
raise RuntimeError(status)
def import_image(clients, image, bucket, key):
def import_image(clients, image, bucket):
"""Import image"""
logger.info("%s: importing %s" % (clients.region, image.name))
logger.info("import %s %s" % (clients.region, image.name))
disk = ecs.models.ImportImageRequestDiskDeviceMapping(
disk_image_size = 1,
format = 'RAW',
ossbucket = bucket,
ossobject = key,
ossobject = image.key,
)
req = ecs.models.ImportImageRequest(
region_id=clients.region,
@@ -219,14 +443,14 @@ def import_image(clients, image, bucket, key):
task_id = rsp.body.task_id
wait_for_task(clients, task_id)
wait_for_image(clients, image_id)
logger.info("%s: imported %s (%s)" %
logger.info("image %s %s (%s)" %
(clients.region, image.name, image_id))
return image_id
def copy_image(clients, image, image_id, censored):
"""Copy imported image to censored region"""
logger.info("%s: copying %s (%s) to %s" %
(clients.region, image.name, image_id, censored.region))
logger.info("import %s %s via %s" %
(censored.region, image.name, clients.region))
req = ecs.models.CopyImageRequest(
region_id=clients.region,
image_id=image_id,
@@ -236,14 +460,12 @@ def copy_image(clients, image, image_id, censored):
rsp = clients.ecs.copy_image(req)
copy_id = rsp.body.image_id
wait_for_image(censored, copy_id)
logger.info("%s: copied %s (%s) to %s" %
(clients.region, image.name, copy_id, censored.region))
logger.info("image %s %s (%s)" % (censored.region, image.name, copy_id))
return copy_id
def finalise_image(clients, image, image_id):
"""Finalise image attributes and permissions"""
logger.info("%s: finalising %s (%s)" %
(clients.region, image.name, image_id))
logger.info("finalise %s %s (%s)" % (clients.region, image.name, image_id))
req = ecs.models.ModifyImageAttributeRequest(
region_id=clients.region,
image_id=image_id,
@@ -271,6 +493,8 @@ parser.add_argument('--overwrite', action='store_true',
help="Overwrite any existing image with same name")
parser.add_argument('--region', '-r', action='append',
help="AliCloud region(s)")
parser.add_argument('--role', '-R', default="AliyunFcDefaultRole",
help="AliCloud role for censorship bypass function")
parser.add_argument('image', nargs='+', help="iPXE disk image")
args = parser.parse_args()
@@ -278,6 +502,7 @@ args = parser.parse_args()
loglevels = [logging.WARNING, logging.INFO, logging.DEBUG]
verbosity = min(args.verbose, (len(loglevels) - 1))
logging.basicConfig(level=loglevels[verbosity])
logging.getLogger('apscheduler').setLevel(logging.WARNING)
# Use default name if none specified
if not args.name:
@@ -290,59 +515,82 @@ regions = args.region or all_regions()
# Construct image list
images = [image(x, args.family, args.name, args.public) for x in args.image]
imports = [(region, image) for region in regions for image in images]
workers = len(imports)
# Look up resource names
fcrole = role_arn(args.role)
# Construct per-region clients
clients = {region: all_clients(region) for region in regions}
account = account_id()
clients = {region: all_clients(region, account) for region in regions}
# Delete existing images from all regions, if applicable
if args.overwrite:
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(delete_image,
clients=clients[region],
name=image.name): (region, image)
for region, image in imports}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Create temporary buckets in all uncensored regions
with ThreadPoolExecutor(max_workers=len(regions)) as executor:
# Create temporary function in each censored region with usable FC
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(create_temp_function,
clients=clients[region],
role=fcrole): region
for region in regions}
funcs = {futures[x]: x.result() for x in as_completed(futures)}
# Create temporary bucket in each region with usable OSS
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(create_temp_bucket,
clients=clients[region]): region
clients=clients[region],
func=funcs[region]): region
for region in regions}
buckets = {futures[x]: x.result() for x in as_completed(futures)}
if not any(buckets.values()):
parser.error("At least one non-Chinese region is required")
# Upload images directly to uncensored regions
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
futures = {executor.submit(upload_image,
# Upload image objects directly to each uncensored region with usable OSS
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(upload_object,
clients=clients[region],
bucket=buckets[region],
image=image): (region, image)
for region, image in imports if buckets[region]}
keys = {futures[x]: x.result() for x in as_completed(futures)}
for region, image in imports
if buckets[region] and not funcs[region]}
uploads = {futures[x]: x.result() for x in as_completed(futures)}
if not uploads:
parser.error("At least one working non-Chinese region is required")
# Import images to uncensored regions
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
# Copy image objects to each censored region with usable OSS and usable FC
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(copy_objects,
clients=clients[region],
bucket=buckets[region],
func=funcs[region],
uploads=uploads): region
for region in regions
if buckets[region] and funcs[region]}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Import images in each region with usable OSS
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(import_image,
clients=clients[region],
image=image,
bucket=buckets[region],
key=keys[(region, image)]): (region, image)
bucket=buckets[region]): (region, image)
for region, image in imports if buckets[region]}
results = {futures[x]: x.result() for x in as_completed(futures)}
# Select source uncensored region for each copy
# Copy images to regions without usable OSS
#
# Copies are rate-limited by source region, so spread the copies
# across all available uncensored regions.
# across all available regions with imported images.
#
copies = [(region, censored, image) for region, (censored, image) in zip(
cycle(region for region in regions if buckets[region]),
((region, image) for region, image in imports if not buckets[region]),
)]
# Copy images to censored regions
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(copy_image,
clients=clients[region],
censored=clients[censored],
@@ -350,10 +598,11 @@ with ThreadPoolExecutor(max_workers=len(imports)) as executor:
image_id=results[(region, image)]):
(censored, image)
for region, censored, image in copies}
results.update({futures[x]: x.result() for x in as_completed(futures)})
copied = {futures[x]: x.result() for x in as_completed(futures)}
results.update(copied)
# Finalise images
with ThreadPoolExecutor(max_workers=len(imports)) as executor:
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(finalise_image,
clients=clients[region],
image=image,
@@ -363,13 +612,22 @@ with ThreadPoolExecutor(max_workers=len(imports)) as executor:
done = {futures[x]: x.result() for x in as_completed(futures)}
# Remove temporary buckets
with ThreadPoolExecutor(max_workers=len(regions)) as executor:
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(delete_temp_bucket,
clients=clients[region],
func=funcs[region],
bucket=buckets[region]): region
for region in regions if buckets[region]}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Remove temporary functions
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(delete_temp_function,
clients=clients[region],
func=funcs[region]): region
for region in regions if funcs[region]}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Show created images
for region, image in imports:
image_id = results[(region, image)]