Initial Functional Implementation of S3 MicroService

This commit is contained in:
2023-08-09 00:32:36 -03:00
commit 0b320a3222
23 changed files with 1161 additions and 0 deletions

View File

View File

@@ -0,0 +1,14 @@
from resize_image_service.config.config_server import get_config_server
from resize_image_service.controller import app
import uvicorn
def main():
config = get_config_server()
uvicorn.run(app, host=config["host"], port=config["port"])
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,12 @@
from dotenv import load_dotenv
import os
def get_config_redis():
load_dotenv()
return {
"host": os.environ.get("REDIS_HOST", "localhost"),
"port": os.environ.get("REDIS_PORT", 6379),
"password": os.environ.get("REDIS_PASSWORD", None),
}

View File

@@ -0,0 +1,13 @@
from dotenv import load_dotenv
import os
def get_config_s3():
load_dotenv()
return {
"aws_access_key_id": os.environ.get("AWS_ACCESS_KEY_ID", None),
"aws_secret_access_key": os.environ.get("AWS_SECRET_ACCESS_KEY", None),
"region_name": os.environ.get("AWS_REGION_NAME", None),
"bucket_name": os.environ.get("AWS_BUCKET_NAME", None),
}

View File

@@ -0,0 +1,11 @@
from dotenv import load_dotenv
import os
def get_config_server():
load_dotenv()
return {
"host": os.environ.get("SERVER_HOST", "0.0.0.0"),
"port": os.environ.get("SERVER_PORT", 8000),
}

View File

@@ -0,0 +1,7 @@
from resize_image_service.controller.s3_controller import s3_router
from fastapi import FastAPI
app = FastAPI()
app.include_router(s3_router, tags=["s3"], prefix="/s3")

View File

@@ -0,0 +1,46 @@
from resize_image_service.depends.depend_queue import dependency_queue
from resize_image_service.depends.depend_s3_service import (
dependency_s3_service,
)
from resize_image_service.service.s3_service import S3Service
from resize_image_service.utils.enums.file_type import FileType
from resize_image_service.utils.file_name_hash import file_name_hash
from resize_image_service.worker.s3_image_worker import s3_image_worker
from fastapi import Body, Depends, Form
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter
from rq import Queue
from typing import Annotated
s3_router = InferringRouter()
@cbv(s3_router)
class S3Controller:
queue: Queue = Depends(dependency_queue, use_cache=True)
s3_service: S3Service = Depends(dependency_s3_service, use_cache=True)
@s3_router.get("/new_file_url/", status_code=200)
def new_file_url(
self,
username: Annotated[str, Form()],
file_postfix: Annotated[str, Form()],
file_type: Annotated[FileType, Form()],
) -> dict[str, str]:
return self.s3_service.get_temp_upload_link(
file_name_hash(username, file_postfix), file_type
)
@s3_router.get("/file_url/", status_code=200)
def file_url(
self, username: Annotated[str, Form()], file_postfix: Annotated[str, Form()]
) -> dict[str, str]:
return self.s3_service.get_temp_read_link(
file_name_hash(username, file_postfix)
)
@s3_router.post("/process_image/", status_code=200)
def process_image(self, string_url: Annotated[str, Body(embed=True)]):
self.queue.enqueue(s3_image_worker, string_url)

View File

View File

@@ -0,0 +1,8 @@
from resize_image_service.config.config_redis import get_config_redis
from redis import Redis
from rq import Queue
def dependency_queue():
return Queue(connection=Redis(**get_config_redis()))

View File

@@ -0,0 +1,9 @@
from resize_image_service.config.config_s3 import get_config_s3
from resize_image_service.service.s3_service import S3Service
from functools import cache
@cache
def dependency_s3_service() -> S3Service:
return S3Service(**get_config_s3())

View File

View File

@@ -0,0 +1,106 @@
from resize_image_service.utils.enums.file_type import CONTENT_TYPE, FileType
import boto3
from PIL import Image
import io
from typing import Any, Dict
class S3Service:
def __init__(self, **kwargs):
self.__validate_config(**kwargs)
self.bucket_name = kwargs.get("bucket_name")
self.region_name = kwargs.get("region_name")
self.s3 = boto3.client(
"s3",
aws_access_key_id=kwargs.get("aws_access_key_id"),
aws_secret_access_key=kwargs.get("aws_secret_access_key"),
region_name=kwargs.get("region_name"),
)
def get_temp_upload_link(
self, file_name, file_type: FileType
) -> dict[str, str | Any]:
return {
"presigned_url": self._get_presigned_right_url(file_name, file_type),
"file_key": self._get_object_url(file_name),
}
def get_temp_read_link(self, file_name) -> dict[str, str | Any]:
return {"presigned_url": self._get_presigned_read_url(file_name)}
def process_image(self, file_name) -> None:
img = self._get_image_obj(file_name)
img = self._resize_img(img)
img = self._remove_img_metadata(img)
self._upload_image(file_name, img)
def _get_object_url(self, file_name: str):
return f"https://{self.bucket_name}.s3.{self.region_name}.amazonaws.com/{file_name}"
def _get_presigned_right_url(self, file_name, file_type: FileType):
return self.s3.generate_presigned_url(
"put_object",
Params={
"Bucket": self.bucket_name,
"Key": file_name,
"ContentType": CONTENT_TYPE[file_type],
},
ExpiresIn=3600,
)
def _get_presigned_read_url(self, file_name):
return self.s3.generate_presigned_url(
"get_object",
Params={"Bucket": self.bucket_name, "Key": file_name},
ExpiresIn=3600,
)
def _get_image_obj(self, file_name: str):
object_byte = io.BytesIO(
self.s3.get_object(Bucket=self.bucket_name, Key=file_name)["Body"].read()
)
return Image.open(object_byte)
def _upload_image(self, file_name: str, img: Image):
new_byte_img = io.BytesIO()
img.save(new_byte_img, format="PNG")
new_byte_img.seek(0)
self.s3.upload_fileobj(new_byte_img, Bucket=self.bucket_name, Key=file_name)
@staticmethod
def _resize_img(img):
img.thumbnail((320, 320))
return img
@staticmethod
def _remove_img_metadata(img):
data = list(img.getdata())
image_without_exif = Image.new(img.mode, img.size)
image_without_exif.putdata(data)
return image_without_exif
@staticmethod
def __validate_config(**kwargs):
if not kwargs.get("bucket_name"):
raise RuntimeError("bucket_name is required")
if not kwargs.get("aws_access_key_id"):
raise RuntimeError("aws_access_key_id is required")
if not kwargs.get("aws_secret_access_key"):
raise RuntimeError("aws_secret_access_key is required")
if not kwargs.get("region_name"):
raise RuntimeError("region_name is required")
if not kwargs.get("bucket_name"):
raise RuntimeError("bucket_name is required")

View File

View File

@@ -0,0 +1,9 @@
from enum import Enum
class FileType(Enum):
PNG = "png"
JPEG = "jpeg"
CONTENT_TYPE = {FileType.PNG: "image/png", FileType.JPEG: "image/jpeg"}

View File

@@ -0,0 +1,9 @@
import base64
from hashlib import md5
def file_name_hash(username: str, file_postfix: str) -> str:
hashed_username = md5(username.encode("utf-8")).digest()
hashed_username = base64.b64encode(hashed_username).decode()
return f"{hashed_username}_{file_postfix}"

View File

View File

@@ -0,0 +1,7 @@
from resize_image_service.depends.depend_s3_service import (
dependency_s3_service,
)
def s3_image_worker(string_url: str) -> None:
dependency_s3_service().process_image(string_url)