mirror of
https://github.com/ipxe/ipxe
synced 2026-05-18 10:00:30 +03:00
2360ec2a79
The underlying snapshots are not automatically deleted along with the image, and there is no flag that can be set to cause them to be automatically deleted. Tag the underlying snapshots for deletion before deleting the image, delete the image, and then delete any such tagged snapshots (including any that may remain from a previous failed deletion attempt). Signed-off-by: Michael Brown <mcb30@ipxe.org>
700 lines
27 KiB
Python
Executable File
700 lines
27 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#
|
|
# Doing anything in Alibaba Cloud is unnecessarily difficult and
|
|
# tedious due to a combination of poor and inconsistent API design,
|
|
# high API call failure rates, and Chinese state censorship laws.
|
|
#
|
|
# We resort to a mixture of strategies to get images imported to all
|
|
# regions:
|
|
#
|
|
# - For regions with working OSS that are not blocked by Chinese
|
|
# state censorship laws, upload the image files to an OSS bucket
|
|
# and then import the images.
|
|
#
|
|
# - For regions with working OSS that are blocked by Chinese state
|
|
# censorship laws but that have working FC, use a temporary FC
|
|
# function to copy the image files from the uncensored OSS buckets
|
|
# and then import the images. Attempt downloads from a variety of
|
|
# uncensored buckets, since cross-region OSS traffic tends to
|
|
# experience a failure rate of around 10% of requests.
|
|
#
|
|
# - For regions that have working OSS but are blocked by Chinese
|
|
# state censorship laws and do not have working FC, or for regions
|
|
# that don't even have working OSS, resort to using CopyImage to
|
|
# copy the previously imported images from another region. Spread
|
|
# the imports across as many source regions as possible to
|
|
# minimise the effect of the CopyImage rate limiting.
|
|
|
|
import argparse
|
|
import base64
|
|
from collections import namedtuple
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import datetime
|
|
import http
|
|
import io
|
|
from itertools import cycle, groupby
|
|
import json
|
|
import logging
|
|
from operator import itemgetter
|
|
from pathlib import Path
|
|
import random
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
from uuid import uuid4
|
|
import zipfile
|
|
|
|
import alibabacloud_credentials as credentials
|
|
import alibabacloud_credentials.client
|
|
import alibabacloud_credentials.models
|
|
import alibabacloud_ecs20140526 as ecs
|
|
import alibabacloud_ecs20140526.client
|
|
import alibabacloud_ecs20140526.models
|
|
import alibabacloud_fc20230330 as fc
|
|
import alibabacloud_fc20230330.client
|
|
import alibabacloud_fc20230330.models
|
|
import alibabacloud_oss_v2 as oss
|
|
import alibabacloud_ram20150501 as ram
|
|
import alibabacloud_ram20150501.client
|
|
import alibabacloud_ram20150501.models
|
|
import alibabacloud_sts20150401 as sts
|
|
import alibabacloud_sts20150401.client
|
|
import alibabacloud_sts20150401.models
|
|
import alibabacloud_tea_openapi as openapi
|
|
import alibabacloud_tea_openapi.client
|
|
import alibabacloud_tea_openapi.models
|
|
import alibabacloud_tea_util as util
|
|
import alibabacloud_tea_util.client
|
|
import alibabacloud_tea_util.models
|
|
|
|
logger = logging.getLogger('ali-import')
|
|
|
|
ECS_ENDPOINT = 'ecs.aliyuncs.com'
|
|
RAM_ENDPOINT = 'ram.aliyuncs.com'
|
|
STS_ENDPOINT = 'sts.aliyuncs.com'
|
|
|
|
FC_NODE_RUNTIME = 'nodejs20'
|
|
FC_TIMEOUT_SEC = 120
|
|
FC_MEMORY_SIZE_MB = 128
|
|
FC_SOURCE_COUNT = 10
|
|
|
|
OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket'
|
|
OSS_BUCKET_NAME_LEN = 63
|
|
|
|
IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-'
|
|
IPXE_SNAPSHOT_DELETE_TAG = 'ipxe-snapshot-delete'
|
|
|
|
POLL_INTERVAL_SEC = 5
|
|
POLL_MAX_RETRIES = 100
|
|
|
|
# Experimentation suggests Alibaba Cloud API calls are extremely
|
|
# unreliable, with a failure rate around 1%. It is therefore
|
|
# necessary to allow for retrying basically every API call.
|
|
#
|
|
# Some API calls (e.g. DescribeImages or ModifyImageAttribute) are
|
|
# naturally idempotent and so safe to retry. Some non-idempotent API
|
|
# calls (e.g. CopyImage) support explicit idempotence tokens. The
|
|
# remaining API calls may simply fail on a retry, if the original
|
|
# request happened to succeed but failed to return a response.
|
|
#
|
|
# We could write convoluted retry logic around the non-idempotent
|
|
# calls, but this would substantially increase the complexity of the
|
|
# already unnecessarily complex code. For now, we assume that
|
|
# retrying non-idempotent requests is probably more likely to fix
|
|
# transient failures than to cause additional problems.
|
|
#
|
|
RUNTIME_OPTS = util.models.RuntimeOptions(
|
|
autoretry=True,
|
|
max_attempts=5,
|
|
connect_timeout=10000,
|
|
read_timeout=120000,
|
|
)
|
|
|
|
# For regions in mainland China, the Chinese state censorship laws
|
|
# prohibit direct access to OSS bucket contents.
|
|
#
|
|
# We work around this restriction by creating a temporary Function
|
|
# Compute function in each region to access OSS via the internal OSS
|
|
# endpoints, which are not subject to these restrictions. Yes, this
|
|
# is somewhat absurd.
|
|
#
|
|
IPXE_CENSORSHIP_BYPASS_FUNCTION = f'''
|
|
const prefix = "{IPXE_STORAGE_PREFIX}";
|
|
''' + '''
|
|
const assert = require("node:assert");
|
|
const OSS = require("ali-oss");
|
|
exports.handler = async (event, context) => {
|
|
const payload = JSON.parse(event.toString());
|
|
console.log(JSON.stringify(payload));
|
|
const sources = payload.sources || {};
|
|
const dest = new OSS({
|
|
region: "oss-" + context.region,
|
|
internal: true,
|
|
bucket: payload.bucket,
|
|
accessKeyId: context.credentials.accessKeyId,
|
|
accessKeySecret: context.credentials.accessKeySecret,
|
|
stsToken: context.credentials.securityToken,
|
|
});
|
|
const current = ((await dest.listV2({prefix: prefix})).objects || [])
|
|
.map(x => x.name);
|
|
const wanted = Object.keys(sources);
|
|
const add = wanted.filter(x => ! current.includes(x));
|
|
const del = current.filter(x => ! wanted.includes(x));
|
|
assert(add.every(x => x.startsWith(prefix)));
|
|
assert(del.every(x => x.startsWith(prefix)));
|
|
if (add.length)
|
|
console.log("Creating: " + add.sort().join(", "));
|
|
if (del.length)
|
|
console.log("Deleting: " + del.sort().join(", "));
|
|
const copy = async (key) => {
|
|
for (const url of sources[key]) {
|
|
console.log("Downloading " + key + " from " + url);
|
|
try {
|
|
const download = await fetch(url, {signal: AbortSignal.timeout(15000)});
|
|
if (! download.ok)
|
|
throw new Error(download.status);
|
|
const content = await download.arrayBuffer();
|
|
console.log("Downloaded " + key);
|
|
console.log("Uploading " + key);
|
|
await dest.put(key, Buffer.from(content));
|
|
console.log("Uploaded " + key);
|
|
return;
|
|
} catch (err) {
|
|
console.error("Download failed", err);
|
|
}
|
|
}
|
|
throw new Error("All downloads failed for " + key);
|
|
};
|
|
await Promise.all([
|
|
...add.map(copy),
|
|
...(del.length ? [dest.deleteMulti(del)] : []),
|
|
]);
|
|
console.log("Finished");
|
|
};
|
|
'''
|
|
|
|
Clients = namedtuple('Clients', ['region', 'censored', 'ecs', 'fc', 'oss'])
|
|
Image = namedtuple('Image',
|
|
['path', 'family', 'name', 'arch', 'mode', 'key', 'public'])
|
|
|
|
def image(filename, basefamily, basename, public):
|
|
"""Construct image description"""
|
|
with tempfile.NamedTemporaryFile(mode='w+t') as mtoolsrc:
|
|
mtoolsrc.writelines([
|
|
'drive D:', f'file="{filename}"',
|
|
'drive P:', f'file="{filename}"', 'partition=4',
|
|
])
|
|
mtoolsrc.flush()
|
|
mdir = subprocess.run(['mdir', '-b', 'D:/EFI/BOOT', 'P:/EFI/BOOT'],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
check=False, env={'MTOOLSRC': mtoolsrc.name})
|
|
mapping = {
|
|
b'BOOTX64.EFI': 'x86_64',
|
|
b'BOOTAA64.EFI': 'arm64',
|
|
}
|
|
uefi = [v for k, v in mapping.items() if k in mdir.stdout]
|
|
suffix = ('-uefi-%s' % uefi[0].replace('_', '-') if len(uefi) == 1 else
|
|
'-uefi-multi' if uefi else '')
|
|
path = Path(filename)
|
|
family = '%s%s' % (basefamily, suffix)
|
|
name = '%s%s' % (basename, suffix)
|
|
arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64'
|
|
mode = 'UEFI' if uefi else 'BIOS'
|
|
key = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
|
|
return Image(path, family, name, arch, mode, key, public)
|
|
|
|
def all_regions():
|
|
"""Get list of all regions"""
|
|
cred = credentials.client.Client()
|
|
conf = openapi.models.Config(credential=cred, endpoint=ECS_ENDPOINT)
|
|
client = ecs.client.Client(conf)
|
|
req = ecs.models.DescribeRegionsRequest()
|
|
rsp = client.describe_regions(req)
|
|
regions = sorted(x.region_id for x in rsp.body.regions.region)
|
|
return regions
|
|
|
|
def account_id():
|
|
"""Get account ID"""
|
|
cred = credentials.client.Client()
|
|
conf = openapi.models.Config(credential=cred, endpoint=STS_ENDPOINT)
|
|
client = sts.client.Client(conf)
|
|
rsp = client.get_caller_identity()
|
|
return rsp.body.account_id
|
|
|
|
def role_arn(name):
|
|
"""Get role resource name"""
|
|
cred = credentials.client.Client()
|
|
conf = openapi.models.Config(credential=cred, endpoint=RAM_ENDPOINT)
|
|
client = ram.client.Client(conf)
|
|
req = ram.models.GetRoleRequest(role_name=name)
|
|
rsp = client.get_role(req)
|
|
return rsp.body.role.arn
|
|
|
|
def all_clients(region, account):
|
|
"""Construct all per-region clients"""
|
|
cred = credentials.client.Client()
|
|
ecsconf = openapi.models.Config(credential=cred, region_id=region)
|
|
fcep = '%s.%s.fc.aliyuncs.com' % (account, region)
|
|
fcconf = openapi.models.Config(credential=cred, endpoint=fcep)
|
|
osscred = oss.credentials.EnvironmentVariableCredentialsProvider()
|
|
ossconf = oss.config.Config(credentials_provider=osscred, region=region)
|
|
clients = Clients(
|
|
region=region,
|
|
censored=region.startswith('cn-'),
|
|
ecs=ecs.client.Client(ecsconf),
|
|
fc=fc.client.Client(fcconf),
|
|
oss=oss.client.Client(ossconf),
|
|
)
|
|
return clients
|
|
|
|
def delete_temp_function(clients, func):
|
|
"""Remove temporary function"""
|
|
logger.info("delete function %s %s" % (clients.region, func))
|
|
assert func.startswith(IPXE_STORAGE_PREFIX)
|
|
clients.fc.delete_function_with_options(func, {}, RUNTIME_OPTS)
|
|
|
|
def create_temp_function(clients, role):
|
|
"""Create temporary function (and remove any stale temporary functions)"""
|
|
req = fc.models.ListFunctionsRequest(prefix=IPXE_STORAGE_PREFIX)
|
|
try:
|
|
rsp = clients.fc.list_functions_with_options(req, {}, RUNTIME_OPTS)
|
|
except openapi.client.UnretryableException:
|
|
# AliCloud provides no other way to detect non-working regions
|
|
return None
|
|
funcs = [x.function_name for x in rsp.body.functions or ()]
|
|
for func in funcs:
|
|
delete_temp_function(clients, func)
|
|
if not clients.censored:
|
|
# Functions are not required in uncensored regions
|
|
return None
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, 'w') as zfh:
|
|
zfh.writestr('index.js', IPXE_CENSORSHIP_BYPASS_FUNCTION)
|
|
zf = base64.b64encode(buf.getvalue()).decode()
|
|
code = fc.models.InputCodeLocation(zip_file=zf)
|
|
func = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4())
|
|
body = fc.models.CreateFunctionInput(
|
|
code=code,
|
|
function_name=func,
|
|
handler='index.handler',
|
|
memory_size=FC_MEMORY_SIZE_MB,
|
|
role=role,
|
|
runtime=FC_NODE_RUNTIME,
|
|
timeout=FC_TIMEOUT_SEC,
|
|
)
|
|
req = fc.models.CreateFunctionRequest(body=body)
|
|
rsp = clients.fc.create_function_with_options(req, {}, RUNTIME_OPTS)
|
|
logger.info("create function %s %s" % (clients.region, func))
|
|
return func
|
|
|
|
def call_temp_function(clients, func, payload):
|
|
"""Call temporary function"""
|
|
hdr = fc.models.InvokeFunctionHeaders(
|
|
x_fc_invocation_type='Sync',
|
|
x_fc_log_type='Tail',
|
|
)
|
|
body = json.dumps(payload)
|
|
req = fc.models.InvokeFunctionRequest(body=body)
|
|
rsp = clients.fc.invoke_function_with_options(func, req, hdr, RUNTIME_OPTS)
|
|
log = base64.b64decode(rsp.headers.get('x-fc-log-result', b'')).decode()
|
|
if rsp.status_code != http.HTTPStatus.OK:
|
|
raise RuntimeError(rsp)
|
|
if 'x-fc-error-type' in rsp.headers:
|
|
raise RuntimeError(log)
|
|
|
|
def delete_temp_bucket(clients, func, bucket):
|
|
"""Remove temporary bucket"""
|
|
logger.info("delete bucket %s %s" % (clients.region, bucket))
|
|
assert bucket.startswith(IPXE_STORAGE_PREFIX)
|
|
# Delete bucket contents
|
|
if not clients.censored:
|
|
# Uncensored region: use OSS API calls to delete contents
|
|
req = oss.models.ListObjectsV2Request(
|
|
bucket=bucket,
|
|
prefix=IPXE_STORAGE_PREFIX,
|
|
)
|
|
rsp = clients.oss.list_objects_v2(req)
|
|
delete = [x.key for x in rsp.contents or ()]
|
|
if delete:
|
|
req = oss.models.DeleteMultipleObjectsRequest(
|
|
bucket=bucket,
|
|
objects=[oss.models.DeleteObject(x) for x in delete],
|
|
)
|
|
rsp = clients.oss.delete_multiple_objects(req)
|
|
elif func:
|
|
# Censored region with FC: use function to delete contents
|
|
payload = {'bucket': bucket}
|
|
call_temp_function(clients, func, payload)
|
|
else:
|
|
# Censored region without FC: assume bucket must be empty,
|
|
# since we could not have uploaded to it in the first place
|
|
pass
|
|
# Delete the now-empty bucket
|
|
req = oss.models.DeleteBucketRequest(bucket=bucket)
|
|
rsp = clients.oss.delete_bucket(req)
|
|
|
|
def create_temp_bucket(clients, func):
|
|
"""Create temporary bucket (and remove any stale temporary buckets)"""
|
|
prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region)
|
|
req = oss.models.ListBucketsRequest(prefix=prefix)
|
|
rsp = clients.oss.list_buckets(req)
|
|
buckets = [x.name for x in rsp.buckets or ()]
|
|
for bucket in buckets:
|
|
delete_temp_bucket(clients, func, bucket)
|
|
if clients.censored and not func:
|
|
# We cannot use OSS in censored regions with no Function Compute
|
|
return None
|
|
bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN]
|
|
req = oss.models.PutBucketRequest(bucket=bucket)
|
|
try:
|
|
rsp = clients.oss.put_bucket(req)
|
|
except oss.exceptions.OperationError as exc:
|
|
# AliCloud provides no other way to detect non-working regions
|
|
if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE:
|
|
return None
|
|
raise exc
|
|
logger.info("create bucket %s %s" % (clients.region, bucket))
|
|
return bucket
|
|
|
|
def upload_object(clients, bucket, image):
|
|
"""Upload disk image object to uncensored bucket"""
|
|
logger.info("upload %s %s" % (clients.region, image.name))
|
|
req = oss.models.PutObjectRequest(bucket=bucket, key=image.key)
|
|
rsp = clients.oss.put_object_from_file(req, image.path)
|
|
req = oss.models.GetObjectRequest(bucket=bucket, key=image.key)
|
|
rsp = clients.oss.presign(req)
|
|
return rsp.url
|
|
|
|
def copy_objects(clients, bucket, func, uploads):
|
|
"""Copy disk image objects to censored bucket from uncensored bucket"""
|
|
logger.info("upload %s (censored)" % clients.region)
|
|
payload = {
|
|
'bucket': bucket,
|
|
'sources': {
|
|
key: random.choices([url for key, url in urls], k=FC_SOURCE_COUNT)
|
|
for key, urls in groupby(sorted(
|
|
((image.key, url) for (region, image), url in uploads.items())
|
|
), key=itemgetter(0))
|
|
}
|
|
}
|
|
call_temp_function(clients, func, payload)
|
|
|
|
def delete_image(clients, name):
|
|
"""Remove existing image (if applicable)"""
|
|
req = ecs.models.DescribeImagesRequest(
|
|
region_id=clients.region,
|
|
image_name=name,
|
|
image_owner_alias='self',
|
|
)
|
|
rsp = clients.ecs.describe_images_with_options(req, RUNTIME_OPTS)
|
|
for image in rsp.body.images.image or ():
|
|
logger.info("delete image %s %s (%s)" %
|
|
(clients.region, image.image_name, image.image_id))
|
|
# Tag associated snapshots for deletion
|
|
for disk in image.disk_device_mappings.disk_device_mapping or ():
|
|
snapshot_id = disk.snapshot_id
|
|
tag = ecs.models.TagResourcesRequestTag(
|
|
key=IPXE_SNAPSHOT_DELETE_TAG,
|
|
value=IPXE_SNAPSHOT_DELETE_TAG,
|
|
)
|
|
req = ecs.models.TagResourcesRequest(
|
|
region_id=clients.region,
|
|
resource_type='snapshot',
|
|
resource_id=[snapshot_id],
|
|
tag=[tag],
|
|
)
|
|
rsp = clients.ecs.tag_resources_with_options(req, RUNTIME_OPTS)
|
|
# Unpublish image
|
|
if image.is_public:
|
|
req = ecs.models.ModifyImageSharePermissionRequest(
|
|
region_id=clients.region,
|
|
image_id=image.image_id,
|
|
is_public=False,
|
|
)
|
|
rsp = clients.ecs.modify_image_share_permission_with_options(
|
|
req, RUNTIME_OPTS
|
|
)
|
|
# Delete image
|
|
req = ecs.models.DeleteImageRequest(
|
|
region_id=clients.region,
|
|
image_id=image.image_id
|
|
)
|
|
rsp = clients.ecs.delete_image_with_options(req, RUNTIME_OPTS)
|
|
# Delete any snapshots tagged for deletion
|
|
tag = ecs.models.ListTagResourcesRequestTag(
|
|
key=IPXE_SNAPSHOT_DELETE_TAG,
|
|
value=IPXE_SNAPSHOT_DELETE_TAG,
|
|
)
|
|
req = ecs.models.ListTagResourcesRequest(
|
|
region_id=clients.region,
|
|
resource_type='snapshot',
|
|
tag=[tag],
|
|
)
|
|
rsp = clients.ecs.list_tag_resources_with_options(req, RUNTIME_OPTS)
|
|
for snapshot in rsp.body.tag_resources.tag_resource or ():
|
|
logger.info("delete snapshot %s %s" %
|
|
(clients.region, snapshot.resource_id))
|
|
req = ecs.models.DeleteSnapshotRequest(
|
|
snapshot_id=snapshot.resource_id,
|
|
force=True,
|
|
)
|
|
rsp = clients.ecs.delete_snapshot_with_options(req, RUNTIME_OPTS)
|
|
|
|
def wait_for_task(clients, task_id):
|
|
"""Wait for task to complete"""
|
|
status = 'Unknowable'
|
|
for i in range(POLL_MAX_RETRIES):
|
|
time.sleep(POLL_INTERVAL_SEC)
|
|
req = ecs.models.DescribeTasksRequest(
|
|
region_id=clients.region,
|
|
task_ids=task_id,
|
|
)
|
|
try:
|
|
rsp = clients.ecs.describe_tasks_with_options(req, RUNTIME_OPTS)
|
|
except openapi.client.UnretryableException:
|
|
continue
|
|
assert len(rsp.body.task_set.task) == 1
|
|
assert rsp.body.task_set.task[0].task_id == task_id
|
|
status = rsp.body.task_set.task[0].task_status
|
|
if status not in ('Waiting', 'Processing'):
|
|
break
|
|
if status != 'Finished':
|
|
raise RuntimeError(status)
|
|
|
|
def wait_for_image(clients, image_id):
|
|
"""Wait for image to become available"""
|
|
status = 'Unknowable'
|
|
for i in range(POLL_MAX_RETRIES):
|
|
time.sleep(POLL_INTERVAL_SEC)
|
|
req = ecs.models.DescribeImagesRequest(
|
|
region_id=clients.region,
|
|
image_id=image_id,
|
|
)
|
|
try:
|
|
rsp = clients.ecs.describe_images_with_options(req, RUNTIME_OPTS)
|
|
except openapi.client.UnretryableException:
|
|
continue
|
|
if len(rsp.body.images.image):
|
|
assert len(rsp.body.images.image) == 1
|
|
assert rsp.body.images.image[0].image_id == image_id
|
|
status = rsp.body.images.image[0].status
|
|
if status != 'Creating':
|
|
break
|
|
if status != 'Available':
|
|
raise RuntimeError(status)
|
|
|
|
def import_image(clients, image, bucket):
|
|
"""Import image"""
|
|
logger.info("import %s %s" % (clients.region, image.name))
|
|
disk = ecs.models.ImportImageRequestDiskDeviceMapping(
|
|
disk_image_size = 1,
|
|
format = 'RAW',
|
|
ossbucket = bucket,
|
|
ossobject = image.key,
|
|
)
|
|
req = ecs.models.ImportImageRequest(
|
|
region_id=clients.region,
|
|
image_name=image.name,
|
|
architecture=image.arch,
|
|
boot_mode=image.mode,
|
|
disk_device_mapping=[disk],
|
|
client_token=str(uuid4()),
|
|
)
|
|
rsp = clients.ecs.import_image_with_options(req, RUNTIME_OPTS)
|
|
image_id = rsp.body.image_id
|
|
task_id = rsp.body.task_id
|
|
wait_for_task(clients, task_id)
|
|
wait_for_image(clients, image_id)
|
|
logger.info("image %s %s (%s)" %
|
|
(clients.region, image.name, image_id))
|
|
return image_id
|
|
|
|
def copy_image(clients, image, image_id, censored):
|
|
"""Copy imported image to censored region"""
|
|
logger.info("import %s %s via %s" %
|
|
(censored.region, image.name, clients.region))
|
|
req = ecs.models.CopyImageRequest(
|
|
region_id=clients.region,
|
|
image_id=image_id,
|
|
destination_region_id=censored.region,
|
|
destination_image_name=image.name,
|
|
client_token=str(uuid4()),
|
|
)
|
|
rsp = clients.ecs.copy_image_with_options(req, RUNTIME_OPTS)
|
|
copy_id = rsp.body.image_id
|
|
wait_for_image(censored, copy_id)
|
|
logger.info("image %s %s (%s)" % (censored.region, image.name, copy_id))
|
|
return copy_id
|
|
|
|
def finalise_image(clients, image, image_id):
|
|
"""Finalise image attributes and permissions"""
|
|
logger.info("finalise %s %s (%s)" % (clients.region, image.name, image_id))
|
|
req = ecs.models.ModifyImageAttributeRequest(
|
|
region_id=clients.region,
|
|
image_id=image_id,
|
|
image_family=image.family,
|
|
)
|
|
rsp = clients.ecs.modify_image_attribute_with_options(req, RUNTIME_OPTS)
|
|
if image.public:
|
|
req = ecs.models.ModifyImageSharePermissionRequest(
|
|
region_id=clients.region,
|
|
image_id=image_id,
|
|
is_public=True,
|
|
)
|
|
rsp = clients.ecs.modify_image_share_permission_with_options(
|
|
req, RUNTIME_OPTS
|
|
)
|
|
|
|
# Parse command-line arguments
|
|
parser = argparse.ArgumentParser(description="Import Alibaba Cloud image")
|
|
parser.add_argument('--verbose', '-v', action='count', default=0)
|
|
parser.add_argument('--name', '-n',
|
|
help="Base image name")
|
|
parser.add_argument('--family', '-f', default='ipxe',
|
|
help="Base family name")
|
|
parser.add_argument('--public', '-p', action='store_true',
|
|
help="Make image(s) public")
|
|
parser.add_argument('--overwrite', action='store_true',
|
|
help="Overwrite any existing image with same name")
|
|
parser.add_argument('--region', '-r', action='append',
|
|
help="AliCloud region(s)")
|
|
parser.add_argument('--role', '-R', default="AliyunFcDefaultRole",
|
|
help="AliCloud role for censorship bypass function")
|
|
parser.add_argument('image', nargs='+', help="iPXE disk image")
|
|
args = parser.parse_args()
|
|
|
|
# Configure logging
|
|
loglevels = [logging.WARNING, logging.INFO, logging.DEBUG]
|
|
verbosity = min(args.verbose, (len(loglevels) - 1))
|
|
logging.basicConfig(level=loglevels[verbosity])
|
|
logging.getLogger('apscheduler').setLevel(logging.WARNING)
|
|
|
|
# Use default name if none specified
|
|
if not args.name:
|
|
args.name = '%s-%s' % (args.family,
|
|
datetime.date.today().strftime('%Y%m%d'))
|
|
|
|
# Use all regions if none specified
|
|
regions = args.region or all_regions()
|
|
|
|
# Construct image list
|
|
images = [image(x, args.family, args.name, args.public) for x in args.image]
|
|
imports = [(region, image) for region in regions for image in images]
|
|
workers = len(imports)
|
|
|
|
# Look up resource names
|
|
fcrole = role_arn(args.role)
|
|
|
|
# Construct per-region clients
|
|
account = account_id()
|
|
clients = {region: all_clients(region, account) for region in regions}
|
|
|
|
# Delete existing images from all regions, if applicable
|
|
if args.overwrite:
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(delete_image,
|
|
clients=clients[region],
|
|
name=image.name): (region, image)
|
|
for region, image in imports}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Create temporary function in each censored region with usable FC
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(create_temp_function,
|
|
clients=clients[region],
|
|
role=fcrole): region
|
|
for region in regions}
|
|
funcs = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Create temporary bucket in each region with usable OSS
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(create_temp_bucket,
|
|
clients=clients[region],
|
|
func=funcs[region]): region
|
|
for region in regions}
|
|
buckets = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Upload image objects directly to each uncensored region with usable OSS
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(upload_object,
|
|
clients=clients[region],
|
|
bucket=buckets[region],
|
|
image=image): (region, image)
|
|
for region, image in imports
|
|
if buckets[region] and not funcs[region]}
|
|
uploads = {futures[x]: x.result() for x in as_completed(futures)}
|
|
if not uploads:
|
|
parser.error("At least one working non-Chinese region is required")
|
|
|
|
# Copy image objects to each censored region with usable OSS and usable FC
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(copy_objects,
|
|
clients=clients[region],
|
|
bucket=buckets[region],
|
|
func=funcs[region],
|
|
uploads=uploads): region
|
|
for region in regions
|
|
if buckets[region] and funcs[region]}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Import images in each region with usable OSS
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(import_image,
|
|
clients=clients[region],
|
|
image=image,
|
|
bucket=buckets[region]): (region, image)
|
|
for region, image in imports if buckets[region]}
|
|
results = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Copy images to regions without usable OSS
|
|
#
|
|
# Copies are rate-limited by source region, so spread the copies
|
|
# across all available regions with imported images.
|
|
#
|
|
copies = [(region, censored, image) for region, (censored, image) in zip(
|
|
cycle(region for region in regions if buckets[region]),
|
|
((region, image) for region, image in imports if not buckets[region]),
|
|
)]
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(copy_image,
|
|
clients=clients[region],
|
|
censored=clients[censored],
|
|
image=image,
|
|
image_id=results[(region, image)]):
|
|
(censored, image)
|
|
for region, censored, image in copies}
|
|
copied = {futures[x]: x.result() for x in as_completed(futures)}
|
|
results.update(copied)
|
|
|
|
# Finalise images
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(finalise_image,
|
|
clients=clients[region],
|
|
image=image,
|
|
image_id=results[(region, image)]):
|
|
(region, image)
|
|
for region, image in imports}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Remove temporary buckets
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(delete_temp_bucket,
|
|
clients=clients[region],
|
|
func=funcs[region],
|
|
bucket=buckets[region]): region
|
|
for region in regions if buckets[region]}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Remove temporary functions
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = {executor.submit(delete_temp_function,
|
|
clients=clients[region],
|
|
func=funcs[region]): region
|
|
for region in regions if funcs[region]}
|
|
done = {futures[x]: x.result() for x in as_completed(futures)}
|
|
|
|
# Show created images
|
|
for region, image in imports:
|
|
image_id = results[(region, image)]
|
|
print("%s %s (%s) %s" % (region, image.name, image.family, image_id))
|