import boto3 from os import getenv from dotenv import load_dotenv import logging from botocore.exceptions import ClientError from hashlib import md5 from time import localtime load_dotenv() def init_client(): try: client = boto3.client("s3", aws_access_key_id=getenv("aws_access_key_id"), aws_secret_access_key=getenv( "aws_secret_access_key"), aws_session_token=getenv("aws_session_token"), region_name=getenv("aws_region_name") # config=botocore.client.Config( # connect_timeout=conf.remote_cfg["remote_timeout"], # read_timeout=conf.remote_cfg["remote_timeout"], # region_name=conf.remote_cfg["aws_default_region"], # retries={ # "max_attempts": conf.remote_cfg["remote_retries"]} ) # check if credentials are correct client.list_buckets() return client except ClientError as e: logging.error(e) except: logging.error("Unexpected error") def list_buckets(aws_s3_client): try: # https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBuckets.html # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/list_buckets.html return aws_s3_client.list_buckets() except ClientError as e: logging.error(e) return False def create_bucket(aws_s3_client, bucket_name, region=getenv("aws_region_name")): # Create bucket try: location = {'LocationConstraint': region} # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/create_bucket.html response = aws_s3_client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration=location ) except ClientError as e: logging.error(e) return False status_code = response["ResponseMetadata"]["HTTPStatusCode"] if status_code == 200: return True return False def delete_bucket(aws_s3_client, bucket_name): # Delete bucket try: # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/delete_bucket.html response = aws_s3_client.delete_bucket(Bucket=bucket_name) except ClientError as e: logging.error(e) return False status_code = response["ResponseMetadata"]["HTTPStatusCode"] if status_code == 200: return True return False def bucket_exists(aws_s3_client, bucket_name): try: response = aws_s3_client.head_bucket(Bucket=bucket_name) except ClientError as e: logging.error(e) return False status_code = response["ResponseMetadata"]["HTTPStatusCode"] if status_code == 200: return True return False def download_file_and_upload_to_s3(aws_s3_client, bucket_name, url, file_name, keep_local=False): from urllib.request import urlopen import io with urlopen(url) as response: content = response.read() try: aws_s3_client.upload_fileobj( Fileobj=io.BytesIO(content), Bucket=bucket_name, ExtraArgs={'ContentType': 'image/jpg'}, Key=file_name ) except Exception as e: logging.error(e) if keep_local: with open(file_name, mode='wb') as jpg_file: jpg_file.write(content) # public URL return "https://s3-{0}.amazonaws.com/{1}/{2}".format( 'us-west-2', bucket_name, file_name ) def set_object_access_policy(aws_s3_client, bucket_name, file_name): try: response = aws_s3_client.put_object_acl( ACL="public-read", Bucket=bucket_name, Key=file_name ) except ClientError as e: logging.error(e) return False status_code = response["ResponseMetadata"]["HTTPStatusCode"] if status_code == 200: return True return False def generate_public_read_policy(bucket_name): import json policy = { "Version": "2012-10-17", "Statement": [ { "Sid": "PublicReadGetObject", "Effect": "Allow", "Principal": "*", "Action": "s3:GetObject", "Resource": f"arn:aws:s3:::{bucket_name}/*", } ], } return json.dumps(policy) def create_bucket_policy(aws_s3_client, bucket_name): aws_s3_client.put_bucket_policy( Bucket=bucket_name, Policy=generate_public_read_policy(bucket_name) ) print("Bucket policy created successfully") def read_bucket_policy(aws_s3_client, bucket_name): try: policy = aws_s3_client.get_bucket_policy(Bucket=bucket_name) policy_str = policy["Policy"] print(policy_str) except ClientError as e: logging.error(e) return False if __name__ == "__main__": s3_client = init_client() # print(download_file_and_upload_to_s3( # s3_client, 'new-bucket-btu', # 'https://www.coreldraw.com/static/cdgs/images/free-trials/img-ui-cdgsx.jpg', # f'image_file_{md5(str(localtime()).encode("utf-8")).hexdigest()}.jpg', # keep_local=True # )) # create_bucket_policy(s3_client, 'new-bucket-btu') # read_bucket_policy(s3_client, 'new-bucket-btu') # buckets = list_buckets(s3_client) # if buckets: # for bucket in buckets['Buckets']: # print(f' {bucket["Name"]}') # print(f'created bucket status: { create_bucket(s3_client, "new-bucket-1-btu")}') # print(f'deleted bucket status: { delete_bucket(s3_client, "btudevopsteam1")}') # print(f'Bucket exists: { bucket_exists(s3_client, "automatinawsbttu-commandline")}') # print(f"set read status: {set_object_access_policy(s3_client, 'new-bucket-btu', 'image_file_78bc222b20d1ff69cdf1290a7537d5fd.jpg')}")