[cloud] Do not rely on Function Compute to import images to Alibaba Cloud

Function Compute is unsupported in several Alibaba Cloud regions.
Rewrite the censorship bypass mechanism to access OSS buckets using a
temporary ECS instance instead of a temporary Function Compute
function.

Importing images now requires that the account has been prepared using
the "ali-setup" script, which creates the necessary role, VPCs, and
vSwitches to allow ECS instances to be launched in each region.

Signed-off-by: Michael Brown <mcb30@ipxe.org>
This commit is contained in:
Michael Brown
2026-04-15 16:02:23 +01:00
parent 09c17f76c3
commit 13ee60c198
+229 -184
View File
@@ -4,16 +4,13 @@ import argparse
import base64
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date
import http
import io
import json
import datetime
import logging
from pathlib import Path
import subprocess
import tempfile
import time
from uuid import uuid4
import zipfile
import alibabacloud_credentials as credentials
import alibabacloud_credentials.client
@@ -21,87 +18,39 @@ import alibabacloud_credentials.models
import alibabacloud_ecs20140526 as ecs
import alibabacloud_ecs20140526.client
import alibabacloud_ecs20140526.models
import alibabacloud_fc20230330 as fc
import alibabacloud_fc20230330.client
import alibabacloud_fc20230330.models
import alibabacloud_oss_v2 as oss
import alibabacloud_ram20150501 as ram
import alibabacloud_ram20150501.client
import alibabacloud_ram20150501.models
import alibabacloud_sts20150401 as sts
import alibabacloud_sts20150401.client
import alibabacloud_sts20150401.models
import alibabacloud_tea_openapi as openapi
import alibabacloud_tea_openapi.client
import alibabacloud_tea_openapi.models
import alibabacloud_tea_util as util
import alibabacloud_tea_util.client
import alibabacloud_tea_util.models
import alibabacloud_vpc20160428 as vpc
import alibabacloud_vpc20160428.client
import alibabacloud_vpc20160428.models
# For regions in mainland China, the Chinese state censorship laws
# prohibit direct access to OSS bucket contents.
#
# We work around this restriction by creating a temporary ECS instance
# in each region to access OSS via the internal OSS endpoints, which
# are not subject to these restrictions. Yes, this is absurd.
logger = logging.getLogger('ali-import')
ECS_ENDPOINT = 'ecs.aliyuncs.com'
RAM_ENDPOINT = 'ram.aliyuncs.com'
STS_ENDPOINT = 'sts.aliyuncs.com'
FC_NODE_RUNTIME = 'nodejs20'
FC_MAX_ATTEMPTS = 5
FC_CONNECT_TIMEOUT_MS = 10000
FC_READ_TIMEOUT_MS = 60000
FC_TIMEOUT_SEC = 60
FC_MEMORY_SIZE_MB = 128
OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket'
OSS_BUCKET_NAME_LEN = 63
IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-'
IPXE_STORAGE_TAG = 'ipxe-upload-temp'
IPXE_STORAGE_DISK_CATEGORY = 'cloud_essd'
# For regions in mainland China, the Chinese state censorship laws
# prohibit direct access to OSS bucket contents.
#
# We work around this restriction by creating a temporary Function
# Compute function in each region to access OSS via the internal OSS
# endpoints, which are not subject to these restrictions. Yes, this
# is somewhat absurd.
#
IPXE_CENSORSHIP_BYPASS_FUNCTION = f'''
const prefix = "{IPXE_STORAGE_PREFIX}";
''' + '''
const assert = require("node:assert");
const OSS = require("ali-oss");
exports.handler = async (event, context) => {
const payload = JSON.parse(event.toString());
console.log(JSON.stringify(payload));
const src = payload.source && new OSS({
region: "oss-" + payload.source.region,
bucket: payload.source.bucket,
accessKeyId: context.credentials.accessKeyId,
accessKeySecret: context.credentials.accessKeySecret,
stsToken: context.credentials.securityToken,
});
const dst = new OSS({
region: "oss-" + context.region,
internal: true,
bucket: payload.bucket,
accessKeyId: context.credentials.accessKeyId,
accessKeySecret: context.credentials.accessKeySecret,
stsToken: context.credentials.securityToken,
});
const add = payload.keys || [];
const del = ((await dst.listV2({prefix: prefix})).objects || [])
.map(x => x.name).filter(x => ! add.includes(x));
assert(add.every(x => x.startsWith(prefix)));
assert(del.every(x => x.startsWith(prefix)));
if (add.length)
console.log("Creating: " + add.sort().join(", "));
if (del.length)
console.log("Deleting: " + del.sort().join(", "));
await Promise.all([
...add.map(async (x) => dst.putStream(x, (await src.getStream(x)).stream)),
...(del.length ? [dst.deleteMulti(del)] : []),
]);
};
'''
IPXE_SG_TAG = 'ipxe-default-sg'
IPXE_VSWITCH_TAG = 'ipxe-default-vswitch'
Clients = namedtuple('Clients', ['region', 'ecs', 'fc', 'oss'])
Clients = namedtuple('Clients', ['region', 'ecs', 'oss', 'vpc'])
Image = namedtuple('Image', ['path', 'family', 'name', 'arch', 'mode'])
def image(filename, basefamily, basename):
@@ -139,111 +88,179 @@ def all_regions():
regions = sorted(x.region_id for x in rsp.body.regions.region)
return regions
def account_id():
"""Get account ID"""
cred = credentials.client.Client()
conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT)
client = sts.client.Client(conf)
rsp = client.get_caller_identity()
return rsp.body.account_id
def role_arn(name):
"""Get role resource name"""
cred = credentials.client.Client()
conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT)
client = ram.client.Client(conf)
req = ram.models.GetRoleRequest(role_name=name)
rsp = client.get_role(req)
return rsp.body.role.arn
def all_clients(region, account):
def all_clients(region):
"""Construct all per-region clients"""
cred = credentials.client.Client()
ecsconf = openapi.models.Config(credential=cred, region_id=region)
fcep = '%s.%s.fc.aliyuncs.com' % (account, region)
fcconf = openapi.models.Config(credential=cred, endpoint=fcep)
osscred = oss.credentials.EnvironmentVariableCredentialsProvider()
ossconf = oss.config.Config(credentials_provider=osscred, region=region)
vpcconf = openapi.models.Config(credential=cred, region_id=region)
clients = Clients(
region=region,
ecs=ecs.client.Client(ecsconf),
fc=fc.client.Client(fcconf),
oss=oss.client.Client(ossconf),
vpc=vpc.client.Client(vpcconf),
)
return clients
def delete_temp_function(clients, func):
"""Remove temporary function"""
assert func.startswith(IPXE_STORAGE_PREFIX)
clients.fc.delete_function(func)
def delete_temp_instance(clients, instance, retry=False):
"""Remove temporary instance"""
logger.info("%s: deleting %s" % (clients.region, instance))
while True:
req = ecs.models.DeleteInstanceRequest(
instance_id=instance,
force=True,
force_stop=True,
)
try:
rsp = clients.ecs.delete_instance(req)
except openapi.exceptions.ClientException:
# Very recently created instances often cannot be
# terminated until some undocumented part of the control
# plane decides that enough time has elapsed
if retry:
time.sleep(1)
continue
raise
break
def create_temp_function(clients, role):
"""Create temporary function (and remove any stale temporary functions)"""
req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX)
try:
rsp = clients.fc.list_functions(req)
except openapi.client.UnretryableException:
# AliCloud provides no other way to detect non-functional regions
def run_temp_instance_command(clients, instance, command):
"""Run command on temporary instance"""
command_content=' '.join(command)
logger.info("%s: running %s" % (clients.region, command_content))
req = ecs.models.RunCommandRequest(
region_id=clients.region,
instance_id=[instance],
type='RunShellScript',
command_content=command_content,
)
rsp = clients.ecs.run_command(req)
invocation = rsp.body.invoke_id
while True:
time.sleep(1)
req = ecs.models.DescribeInvocationResultsRequest(
region_id=clients.region,
invoke_id=invocation,
)
rsp = clients.ecs.describe_invocation_results(req)
result = rsp.body.invocation.invocation_results.invocation_result[0]
if result.invoke_record_status not in ('Pending', 'Running'):
break
output = base64.b64decode(result.output).decode()
if result.invocation_status != 'Success':
raise RuntimeError(output if output else result.invocation_status)
return result
def create_temp_instance(clients, family, machine, role):
"""Create temporary instance (and remove any stale temporary instances)"""
tag = ecs.models.DescribeInstancesRequestTag(
key=IPXE_STORAGE_TAG,
value=IPXE_STORAGE_TAG,
)
req = ecs.models.DescribeInstancesRequest(
region_id=clients.region,
tag=[tag],
)
rsp = clients.ecs.describe_instances(req)
for instance in rsp.body.instances.instance or []:
delete_temp_instance(clients, instance.instance_id)
req = ecs.models.DescribeAvailableResourceRequest(
region_id=clients.region,
destination_resource='Zone',
instance_type=machine,
)
rsp = clients.ecs.describe_available_resource(req)
if rsp.body.available_zones is None:
# Cannot create instances in this region
logger.warning("%s: no zones support %s" % (clients.region, machine))
return None
funcs = [x.function_name for x in rsp.body.functions or ()]
for func in funcs:
delete_temp_function(clients, func)
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zfh:
zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION)
zf = base64.b64encode(buf.getvalue()).decode()
code = fc.models.InputCodeLocation(zip_file=zf)
func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
body = fc.models.CreateFunctionInput(
code=code,
function_name=func,
handler='index.handler',
memory_size=FC_MEMORY_SIZE_MB,
role=role,
runtime=FC_NODE_RUNTIME,
timeout=FC_TIMEOUT_SEC,
zone_id = next(x.zone_id
for x in rsp.body.available_zones.available_zone or []
if x.status == 'Available')
logger.info("%s: creating %s in %s" % (clients.region, machine, zone_id))
tag = ecs.models.DescribeSecurityGroupsRequestTag(
key=IPXE_SG_TAG,
value=IPXE_SG_TAG,
)
req = fc.models.CreateFunctionRequest(body=body)
rsp = clients.fc.create_function(req)
return func
req = ecs.models.DescribeSecurityGroupsRequest(
region_id=clients.region,
tag=[tag],
)
rsp = clients.ecs.describe_security_groups(req)
sgs = rsp.body.security_groups.security_group or []
sg_id = sgs[0].security_group_id
vpc_id = sgs[0].vpc_id
tag = vpc.models.DescribeVSwitchesRequestTag(
key=IPXE_VSWITCH_TAG,
value=IPXE_VSWITCH_TAG,
)
req = vpc.models.DescribeVSwitchesRequest(
region_id=clients.region,
vpc_id=vpc_id,
zone_id=zone_id,
tag=[tag],
)
rsp = clients.vpc.describe_vswitches(req)
vswitches = rsp.body.v_switches.v_switch or []
vswitch_id = vswitches[0].v_switch_id
name = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
sysdisk = ecs.models.RunInstancesRequestSystemDisk(
category=IPXE_STORAGE_DISK_CATEGORY,
)
now = datetime.datetime.now(datetime.UTC)
lifetime = datetime.timedelta(hours=1)
release = (now + lifetime).strftime('%Y-%m-%dT%H:%M:%SZ')
tag = ecs.models.RunInstancesRequestTag(
key=IPXE_STORAGE_TAG,
value=IPXE_STORAGE_TAG,
)
req = ecs.models.RunInstancesRequest(
region_id=clients.region,
image_family=family,
instance_type=machine,
instance_name=name,
auto_release_time=release,
ram_role_name=role,
system_disk=sysdisk,
security_group_ids=[sg_id],
v_switch_id=vswitch_id,
internet_charge_type='PayByTraffic',
internet_max_bandwidth_out=100,
tag=[tag],
)
try:
rsp = clients.ecs.run_instances(req)
except openapi.exceptions.ClientException as exc:
if exc.code in ('RegionUnauthorized',
'InvalidPeriod.RegionDiscontinued',
'InvalidInstanceType.ValueNotSupported'):
logger.warning("%s: ECS lied about availability" % clients.region)
return None
raise
instance_id = rsp.body.instance_id_sets.instance_id_set[0]
logger.info("%s: created %s" % (clients.region, instance_id))
command = ['aliyun', 'configure', 'set', '--mode', 'EcsRamRole',
'--region', clients.region]
run_temp_instance_command(clients, instance_id, command)
return instance_id
def call_temp_function(clients, func, payload):
"""Call temporary function"""
hdr = fc.models.InvokeFunctionHeaders(
x_fc_invocation_type='Sync',
x_fc_log_type='Tail',
)
body = json.dumps(payload)
req = fc.models.InvokeFunctionRequest(body=body)
run = util.models.RuntimeOptions(
autoretry=True,
max_attempts=FC_MAX_ATTEMPTS,
connect_timeout=FC_CONNECT_TIMEOUT_MS,
read_timeout=FC_READ_TIMEOUT_MS,
)
rsp = clients.fc.invoke_function_with_options(func, req, hdr, run)
log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode()
if rsp.status_code != http.HTTPStatus.OK:
raise RuntimeError(rsp)
if 'x-fc-error-type' in rsp.headers:
raise RuntimeError(log)
def delete_temp_bucket(clients, func, bucket):
def delete_temp_bucket(clients, instance, bucket):
"""Remove temporary bucket"""
logger.info("%s: deleting %s" % (clients.region, bucket))
assert bucket.startswith(IPXE_STORAGE_PREFIX)
payload = {'bucket': bucket}
call_temp_function(clients, func, payload)
req = oss.models.DeleteBucketRequest(bucket=bucket)
clients.oss.delete_bucket(req)
command = ['aliyun', 'oss', 'rm', 'oss://%s' % bucket,
'--bucket', '--recursive', '--force',
'--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)]
run_temp_instance_command(clients, instance, command)
def create_temp_bucket(clients, func):
def create_temp_bucket(clients, instance):
"""Create temporary bucket (and remove any stale temporary buckets)"""
prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region)
req = oss.models.ListBucketsRequest(prefix=prefix)
rsp = clients.oss.list_buckets(req)
buckets = [x.name for x in rsp.buckets or ()]
for bucket in buckets:
delete_temp_bucket(clients, func, bucket)
delete_temp_bucket(clients, instance, bucket)
bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN]
req = oss.models.PutBucketRequest(bucket=bucket)
try:
@@ -251,25 +268,31 @@ def create_temp_bucket(clients, func):
except oss.exceptions.OperationError as exc:
# AliCloud provides no other way to detect non-functional regions
if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE:
logger.warning("%s: non-functional OSS" % clients.region)
return None
raise exc
logger.info("%s: created %s" % (clients.region, bucket))
return bucket
def upload_image(clients, bucket, image):
"""Upload disk image to uncensored bucket"""
logger.info("%s: uploading %s" % (clients.region, image.name))
key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
req = oss.models.PutObjectRequest(bucket=bucket, key=key)
rsp = clients.oss.put_object_from_file(req, image.path)
return key
def copy_images(clients, func, bucket, source, keys):
def copy_images(clients, instance, bucket, source):
"""Copy disk images to bucket from uncensored bucket"""
payload = {
'bucket': bucket,
'source': source,
'keys': keys,
}
call_temp_function(clients, func, payload)
logger.info("%s: syncing from %s" % (clients.region, source['bucket']))
command = ['aliyun', 'oss', 'sync',
'oss://%s' % source['bucket'], 'syncdir',
'--endpoint', ('oss-%s.aliyuncs.com' % source['region'])]
run_temp_instance_command(clients, instance, command)
logger.info("%s: syncing to %s" % (clients.region, bucket))
command = ['aliyun', 'oss', 'sync', 'syncdir', 'oss://%s' % bucket,
'--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)]
run_temp_instance_command(clients, instance, command)
def delete_images(clients, name):
"""Remove existing images"""
@@ -280,6 +303,15 @@ def delete_images(clients, name):
)
rsp = clients.ecs.describe_images(req)
for image in rsp.body.images.image or ():
logger.info("%s: deleting %s (%s)" %
(clients.region, image.image_name, image.image_id))
if image.is_public:
req = ecs.models.ModifyImageSharePermissionRequest(
region_id=clients.region,
image_id=image.image_id,
is_public=False,
)
rsp = clients.ecs.modify_image_share_permission(req)
req = ecs.models.DeleteImageRequest(
region_id=clients.region,
image_id=image.image_id
@@ -290,6 +322,7 @@ def import_image(clients, image, bucket, key, public, overwrite):
"""Import image"""
if overwrite:
delete_images(clients, image.name)
logger.info("%s: importing %s" % (clients.region, image.name))
disk = ecs.models.ImportImageRequestDiskDeviceMapping(
disk_image_size = 1,
format = 'RAW',
@@ -331,10 +364,13 @@ def import_image(clients, image, bucket, key, public, overwrite):
is_public=True,
)
rsp = clients.ecs.modify_image_share_permission(req)
logger.info("%s: imported %s (%s)" %
(clients.region, image.name, image_id))
return image_id
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Import Alibaba Cloud image")
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--name', '-n',
help="Base image name")
parser.add_argument('--family', '-f', default='ipxe',
@@ -345,14 +381,25 @@ parser.add_argument('--overwrite', action='store_true',
help="Overwrite any existing image with same name")
parser.add_argument('--region', '-r', action='append',
help="AliCloud region(s)")
parser.add_argument('--fc-role', '-F', default="AliyunFcDefaultRole",
help="AliCloud role for censorship bypass function")
parser.add_argument('--role', '-R', default="iPXECensorshipBypassRole",
help="AliCloud OSS censorship bypass role")
parser.add_argument('--helper-family',
default="acs:alibaba_cloud_linux_4_lts_x64",
help="Helper OS image family")
parser.add_argument('--helper-machine', default="ecs.e-c1m1.large",
help="Helper machine type")
parser.add_argument('image', nargs='+', help="iPXE disk image")
args = parser.parse_args()
# Configure logging
loglevels = [logging.WARNING, logging.INFO, logging.DEBUG]
verbosity = min(args.verbose, (len(loglevels) - 1))
logging.basicConfig(level=loglevels[verbosity])
# Use default name if none specified
if not args.name:
args.name = '%s-%s' % (args.family, date.today().strftime('%Y%m%d'))
args.name = '%s-%s' % (args.family,
datetime.date.today().strftime('%Y%m%d'))
# Construct image list
images = [image(x, args.family, args.name) for x in args.image]
@@ -361,28 +408,26 @@ images = [image(x, args.family, args.name) for x in args.image]
if not args.region:
args.region = all_regions()
# Look up resource names
fcrole = role_arn(args.fc_role)
# Construct per-region clients
account = account_id()
clients = {region: all_clients(region, account) for region in args.region}
clients = {region: all_clients(region) for region in args.region}
# Create temporary functions in each region
# Create temporary instances in each region
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(create_temp_function,
futures = {executor.submit(create_temp_instance,
clients=clients[region],
role=fcrole): region
family=args.helper_family,
machine=args.helper_machine,
role=args.role): region
for region in args.region}
funcs = {futures[x]: x.result() for x in as_completed(futures)}
instances = {futures[x]: x.result() for x in as_completed(futures)}
# Create temporary buckets in each region (requires function to exist)
# Create temporary buckets in each region (requires instance to exist)
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(create_temp_bucket,
clients=clients[region],
func=funcs[region]): region
instance=instances[region]): region
for region in args.region
if funcs[region] is not None}
if instances[region] is not None}
buckets = {futures[x]: x.result() for x in as_completed(futures)}
# Select an uncensored region with functioning object storage
@@ -405,12 +450,11 @@ with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
source = {'region': uncensored, 'bucket': buckets[uncensored]}
futures = {executor.submit(copy_images,
clients=clients[region],
func=funcs[region],
instance=instances[region],
bucket=buckets[region],
source=source,
keys=list(keys.values())): region
source=source): region
for region in args.region
if funcs[region] is not None and buckets[region] is not None
if instances[region] is not None and buckets[region] is not None
and region != uncensored}
done = {futures[x]: x.result() for x in as_completed(futures)}
@@ -425,32 +469,33 @@ with ThreadPoolExecutor(max_workers=len(imports)) as executor:
public=args.public,
overwrite=args.overwrite): (region, image)
for region, image in imports
if funcs[region] is not None and buckets[region] is not None}
if instances[region] is not None and buckets[region] is not None}
results = {futures[x]: x.result() for x in as_completed(futures)}
# Remove temporary buckets
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(delete_temp_bucket,
clients=clients[region],
func=funcs[region],
instance=instances[region],
bucket=buckets[region]): region
for region in args.region
if funcs[region] is not None and buckets[region] is not None}
if instances[region] is not None and buckets[region] is not None}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Remove temporary functions
# Remove temporary instances
with ThreadPoolExecutor(max_workers=len(args.region)) as executor:
futures = {executor.submit(delete_temp_function,
futures = {executor.submit(delete_temp_instance,
clients=clients[region],
func=funcs[region]): region
instance=instances[region],
retry=True): region
for region in args.region
if funcs[region] is not None}
if instances[region] is not None}
done = {futures[x]: x.result() for x in as_completed(futures)}
# Show created images
for region, image in imports:
mark = "(*)" if region == uncensored else ""
result = ("[no FC]" if funcs[region] is None else
result = ("[no ECS]" if instances[region] is None else
"[no OSS]" if buckets[region] is None else
results[(region, image)])
print("%s%s %s (%s) %s" % (region, mark, image.name, image.family, result))