Merge pull request #17 from HideyoshiSolutions/develop

Develop - Implements With RedisQueueRS for Better Scalability
This commit is contained in:
2024-03-30 00:49:38 -03:00
committed by GitHub
12 changed files with 348 additions and 398 deletions

551
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -19,9 +19,9 @@ serde_json = "1.0.114"
cached = "0.49.2" cached = "0.49.2"
log = "0.4.20" log = "0.4.20"
lettre = { version = "0.11.4", default-features = false, features = ["smtp-transport", "tokio1-rustls-tls", "hostname", "builder"] } lettre = { version = "0.11.4", default-features = false, features = ["smtp-transport", "tokio1-rustls-tls", "hostname", "builder"] }
redis = { version = "0.24.0", features = ["aio", "tokio-comp"] } redis = { version = "0.25.2", features = ["aio", "tokio-comp"] }
tower-http = { version = "0.5.2", features = ["cors"] } tower-http = { version = "0.5.2", features = ["cors"] }
dotenvy = "0.15.7" dotenvy = "0.15.7"
futures = "0.3.30" futures = "0.3.30"
deadqueue = "0.2.4" deadqueue = "0.2.4"
redis-work-queue = "0.1.6" redis-queue-rs = "0.1.1"

View File

@@ -1,10 +1,15 @@
use cached::proc_macro::cached;
use crate::config::config_auth::get_config_auth; use crate::config::config_auth::get_config_auth;
use crate::config::config_limits::get_config_limits; use crate::config::config_limits::get_config_limits;
use crate::config::config_redis::get_config_redis; use crate::depends::depends_redis_client::get_depends_redis_client;
use crate::service::auth_service::AuthService; use crate::service::auth_service::AuthService;
use cached::proc_macro::cached;
#[cached] #[cached]
pub fn get_depends_auth_service() -> AuthService { pub fn get_depends_auth_service() -> AuthService {
AuthService::new(get_config_auth(), get_config_redis(), get_config_limits()) AuthService::new(
get_config_auth(),
get_depends_redis_client(),
get_config_limits(),
)
} }

View File

@@ -1,8 +1,10 @@
use crate::config::config_email::get_config_email;
use crate::service::email_service::EmailService;
use cached::proc_macro::cached; use cached::proc_macro::cached;
use crate::config::config_email::get_config_email;
use crate::depends::depends_redis_client::get_depends_redis_client;
use crate::service::email_service::EmailService;
#[cached] #[cached]
pub fn get_depends_email_service() -> EmailService { pub fn get_depends_email_service() -> EmailService {
EmailService::new(get_config_email()) EmailService::new(get_config_email(), get_depends_redis_client())
} }

View File

@@ -0,0 +1,15 @@
use crate::config::config_redis::get_config_redis;
use cached::proc_macro::cached;
#[cached]
pub fn get_depends_redis_client() -> redis::Client {
let config_redis = get_config_redis();
redis::Client::open(
format!(
"redis://{}:{}",
config_redis.redis_url, config_redis.redis_port
)
.as_str(),
)
.unwrap()
}

View File

@@ -1,2 +1,3 @@
pub mod depends_auth_service; pub mod depends_auth_service;
pub mod depends_email_service; pub mod depends_email_service;
pub mod depends_redis_client;

View File

@@ -6,7 +6,7 @@ use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
pub async fn send_message( pub async fn send_message(
Extension(auth_service): Extension<AuthService>, Extension(auth_service): Extension<AuthService>,
Extension(email_service): Extension<EmailService>, Extension(mut email_service): Extension<EmailService>,
Extension(author): Extension<MessageAuthor>, Extension(author): Extension<MessageAuthor>,
Json(mut payload): Json<SendMessage>, Json(mut payload): Json<SendMessage>,
) -> impl IntoResponse { ) -> impl IntoResponse {

View File

@@ -8,9 +8,9 @@ mod service;
mod utils; mod utils;
use crate::config::config_server; use crate::config::config_server;
use crate::depends::depends_email_service;
#[tokio::main] async fn run_server() {
async fn main() {
let server_config = config_server::get_config_server(); let server_config = config_server::get_config_server();
let app = route::create_route(server_config.allowed_origins); let app = route::create_route(server_config.allowed_origins);
@@ -23,3 +23,34 @@ async fn main() {
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }
async fn run_worker() {
let mut email_service = depends_email_service::get_depends_email_service();
loop {
email_service.create_send_message_task().await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
async fn run_both() {
futures::future::join(run_server(), run_worker()).await;
}
#[tokio::main]
async fn main() {
let args: Vec<String> = std::env::args().collect();
let runtime: &str;
if args.len() < 2 {
runtime = "both";
} else {
runtime = args[1].as_str();
}
match runtime {
"server" => run_server().await,
"worker" => run_worker().await,
"both" => run_both().await,
_ => panic!("Invalid argument"),
}
}

View File

@@ -1,5 +1,3 @@
use chrono::serde::ts_seconds::deserialize as from_ts;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::serde_as; use serde_with::serde_as;
@@ -18,7 +16,4 @@ pub struct SendMessage {
pub subject: String, pub subject: String,
pub message: String, pub message: String,
#[serde(deserialize_with = "from_ts")]
pub timestamp: DateTime<Utc>,
} }

View File

@@ -1,32 +1,24 @@
use crate::config::config_auth::ConfigAuth; use redis::aio::MultiplexedConnection;
use crate::config::config_limits::ConfigLimits;
use crate::config::config_redis::ConfigRedis;
use crate::model::send_message::MessageAuthor;
use redis::{AsyncCommands, ExistenceCheck, SetExpiry, SetOptions}; use redis::{AsyncCommands, ExistenceCheck, SetExpiry, SetOptions};
use reqwest::header::AUTHORIZATION; use reqwest::header::AUTHORIZATION;
use crate::config::config_auth::ConfigAuth;
use crate::config::config_limits::ConfigLimits;
use crate::model::send_message::MessageAuthor;
#[derive(Clone)] #[derive(Clone)]
pub struct AuthService { pub struct AuthService {
auth_url: String, auth_url: String,
redis: redis::Client, redis_client: redis::Client,
max_requests: u32, max_requests: u32,
expiration_time: usize, expiration_time: usize,
} }
impl AuthService { impl AuthService {
pub fn new(config_auth: ConfigAuth, config_redis: ConfigRedis, limits: ConfigLimits) -> Self { pub fn new(config_auth: ConfigAuth, redis_client: redis::Client, limits: ConfigLimits) -> Self {
let client = redis::Client::open(
format!(
"redis://{}:{}",
config_redis.redis_url, config_redis.redis_port
)
.as_str(),
)
.unwrap();
AuthService { AuthService {
redis_client,
auth_url: config_auth.auth_url, auth_url: config_auth.auth_url,
redis: client,
max_requests: limits.max_requests, max_requests: limits.max_requests,
expiration_time: limits.expiration_time, expiration_time: limits.expiration_time,
} }
@@ -59,7 +51,6 @@ impl AuthService {
} }
pub async fn increase_user_request_count(&self, user: &MessageAuthor) -> bool { pub async fn increase_user_request_count(&self, user: &MessageAuthor) -> bool {
let mut con = self.redis.get_async_connection().await.unwrap();
let current_request_key = format!( let current_request_key = format!(
"user-message:{}:requests:{}", "user-message:{}:requests:{}",
user.email, user.email,
@@ -71,18 +62,24 @@ impl AuthService {
.conditional_set(ExistenceCheck::NX) .conditional_set(ExistenceCheck::NX)
.get(false); .get(false);
return con return self
.get_async_connection()
.await
.set_options(&current_request_key, 1, set_options) .set_options(&current_request_key, 1, set_options)
.await .await
.expect("Error setting key"); .expect("Error setting key");
} }
async fn count_user_requests(&self, user: &MessageAuthor) -> u32 { async fn count_user_requests(&self, user: &MessageAuthor) -> u32 {
let mut con = self.redis.get_async_connection().await.unwrap();
let query_user_requests = format!("user-message:{}:requests:*", user.email); let query_user_requests = format!("user-message:{}:requests:*", user.email);
let results: Vec<String>; let results: Vec<String>;
match con.keys(query_user_requests).await { match self
.get_async_connection()
.await
.keys(query_user_requests)
.await
{
Ok(r) => { Ok(r) => {
results = r; results = r;
} }
@@ -93,4 +90,11 @@ impl AuthService {
return results.len() as u32; return results.len() as u32;
} }
async fn get_async_connection(&self) -> MultiplexedConnection {
self.redis_client
.get_multiplexed_async_connection()
.await
.unwrap()
}
} }

View File

@@ -1,71 +1,58 @@
use std::sync::Arc;
use crate::config::config_email::ConfigEmail;
use crate::model::send_message::SendMessage;
use lettre::message::Mailbox; use lettre::message::Mailbox;
use lettre::{ use lettre::{
transport::smtp::authentication::Credentials, Address, AsyncSmtpTransport, AsyncTransport, transport::smtp::authentication::Credentials, Address, AsyncSmtpTransport, AsyncTransport,
Message, Tokio1Executor, Message, Tokio1Executor,
}; };
use redis_queue_rs::async_redis_queue::AsyncRedisQueue;
use crate::config::config_email::ConfigEmail;
type MessageQueue = deadqueue::unlimited::Queue<SendMessage>; use crate::model::send_message::SendMessage;
#[derive(Clone)] #[derive(Clone)]
pub struct EmailService { pub struct EmailService {
name: String, name: String,
email: String, email: String,
mailer: AsyncSmtpTransport<Tokio1Executor>, mailer: AsyncSmtpTransport<Tokio1Executor>,
redis_client: redis::Client,
message_queue: Arc<MessageQueue>
} }
impl EmailService { impl EmailService {
pub fn new(config_email: ConfigEmail) -> Self { pub fn new(config_email: ConfigEmail, redis_client: redis::Client) -> EmailService {
let email_service = EmailService { let mailer =
name: config_email.smtp_name.clone(), AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&config_email.smtp_server)
email: config_email.smtp_email.clone(),
mailer: AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&config_email.smtp_server)
.unwrap() .unwrap()
.credentials(Credentials::new( .credentials(Credentials::new(
config_email.smtp_username, config_email.smtp_username,
config_email.smtp_password, config_email.smtp_password,
)) ))
.port(config_email.smtp_port) .port(config_email.smtp_port)
.build(), .build();
message_queue: Arc::new(MessageQueue::new())
};
email_service.bind_executer();
email_service EmailService {
mailer,
redis_client,
name: config_email.smtp_name.clone(),
email: config_email.smtp_email.clone(),
}
} }
pub async fn send_email ( pub async fn send_email(&mut self, m: SendMessage) {
&self, self.get_message_queue().await.push(m).await;
m: SendMessage
) {
self.message_queue.push(m);
} }
fn bind_executer(&self) { pub(crate) async fn create_send_message_task(&mut self) {
let local_self = self.clone(); let message = self.get_message_queue().await.pop().await;
let message_queue = self.message_queue.clone(); if message.is_none() {
tokio::spawn(async move { return;
loop { }
let message = message_queue.pop().await;
local_self.create_send_message_task(message).await;
}
});
}
async fn create_send_message_task(&self, m: SendMessage) { let message = message.unwrap();
let message = self.message_queue.pop().await; let message_author = message.author.clone().unwrap();
match self.send_message_smtp(message.clone()).await { match self.send_message_smtp(message).await {
Ok(_) => { Ok(_) => {
println!( println!(
"Email sent successfully from {} to {}", "Email sent successfully from {} to {}",
message.author.unwrap().email, message_author.email, self.email
self.email
); );
} }
Err(e) => { Err(e) => {
@@ -105,4 +92,9 @@ impl EmailService {
sender.name, sender.email, self.name, self.email, m.subject, m.message sender.name, sender.email, self.name, self.email, m.subject, m.message
) )
} }
pub async fn get_message_queue(&self) -> AsyncRedisQueue<SendMessage> {
let redis_client = self.redis_client.clone();
AsyncRedisQueue::new("message-hideyoshi.com".to_string(), redis_client).await
}
} }

View File

@@ -1,6 +1,6 @@
use axum::Router; use axum::Router;
use http::Method; use http::Method;
use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer}; use tower_http::cors::{AllowOrigin, CorsLayer};
pub struct RouterBuilder { pub struct RouterBuilder {
router: Router, router: Router,