blob: faba9f06d2ac6547e875680fab3ad04ab5851a32 [file] [log] [blame]
import json
import re
import shutil
from pathlib import Path
from typing import Any, List
import boto3 # type: ignore[import]
def zip_folder(folder_to_zip: Path, dest_file_base_name: Path) -> Path:
"""
Returns the path to the resulting zip file, with the appropriate extension added if needed
"""
# shutil.make_archive will append .zip to the dest_file_name, so we need to remove it if it's already there
if dest_file_base_name.suffix == ".zip":
dest_file_base_name = dest_file_base_name.with_suffix("")
ensure_dir_exists(dest_file_base_name.parent)
print(f"Zipping {folder_to_zip}\n to {dest_file_base_name}")
# Convert to string because shutil.make_archive doesn't like Path objects
return Path(shutil.make_archive(str(dest_file_base_name), "zip", folder_to_zip))
def unzip_folder(zip_file_path: Path, unzip_to_folder: Path) -> None:
"""
Returns the path to the unzipped folder
"""
print(f"Unzipping {zip_file_path}")
print(f" to {unzip_to_folder}")
shutil.unpack_archive(zip_file_path, unzip_to_folder, "zip")
def ensure_dir_exists(dir: Path) -> None:
dir.mkdir(parents=True, exist_ok=True)
def copy_file(source_file: Path, dest_file: Path) -> None:
ensure_dir_exists(dest_file.parent)
shutil.copyfile(source_file, dest_file)
def load_json_file(file_path: Path) -> Any:
"""
Returns the deserialized json object
"""
with open(file_path) as f:
return json.load(f)
def write_json_file(file_path: Path, content: Any) -> None:
dir = file_path.parent
ensure_dir_exists(dir)
with open(file_path, "w") as f:
json.dump(content, f, indent=2)
def sanitize_for_s3(text: str) -> str:
"""
S3 keys can only contain alphanumeric characters, underscores, and dashes.
This function replaces all other characters with underscores.
"""
return re.sub(r"[^a-zA-Z0-9_-]", "_", text)
def upload_file_to_s3(file_name: Path, bucket: str, key: str) -> None:
print(f"Uploading {file_name}")
print(f" to s3://{bucket}/{key}")
boto3.client("s3").upload_file(
str(file_name),
bucket,
key,
)
def download_s3_objects_with_prefix(
bucket_name: str, prefix: str, download_folder: Path
) -> List[Path]:
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)
downloads = []
for obj in bucket.objects.filter(Prefix=prefix):
download_path = download_folder / obj.key
ensure_dir_exists(download_path.parent)
print(f"Downloading s3://{bucket.name}/{obj.key}")
print(f" to {download_path}")
s3.Object(bucket.name, obj.key).download_file(str(download_path))
downloads.append(download_path)
if len(downloads) == 0:
print(
f"There were no files matching the prefix `{prefix}` in bucket `{bucket.name}`"
)
return downloads