Implements With RedisQueueRS for Better Scalability

This commit is contained in:
2024-03-30 00:43:01 -03:00
parent d12913d7a5
commit 4c3572a5a7
12 changed files with 348 additions and 398 deletions

551
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -19,9 +19,9 @@ serde_json = "1.0.114"
cached = "0.49.2"
log = "0.4.20"
lettre = { version = "0.11.4", default-features = false, features = ["smtp-transport", "tokio1-rustls-tls", "hostname", "builder"] }
redis = { version = "0.24.0", features = ["aio", "tokio-comp"] }
redis = { version = "0.25.2", features = ["aio", "tokio-comp"] }
tower-http = { version = "0.5.2", features = ["cors"] }
dotenvy = "0.15.7"
futures = "0.3.30"
deadqueue = "0.2.4"
redis-work-queue = "0.1.6"
redis-queue-rs = "0.1.1"

View File

@@ -1,10 +1,15 @@
use cached::proc_macro::cached;
use crate::config::config_auth::get_config_auth;
use crate::config::config_limits::get_config_limits;
use crate::config::config_redis::get_config_redis;
use crate::depends::depends_redis_client::get_depends_redis_client;
use crate::service::auth_service::AuthService;
use cached::proc_macro::cached;
#[cached]
pub fn get_depends_auth_service() -> AuthService {
AuthService::new(get_config_auth(), get_config_redis(), get_config_limits())
AuthService::new(
get_config_auth(),
get_depends_redis_client(),
get_config_limits(),
)
}

View File

@@ -1,8 +1,10 @@
use crate::config::config_email::get_config_email;
use crate::service::email_service::EmailService;
use cached::proc_macro::cached;
use crate::config::config_email::get_config_email;
use crate::depends::depends_redis_client::get_depends_redis_client;
use crate::service::email_service::EmailService;
#[cached]
pub fn get_depends_email_service() -> EmailService {
EmailService::new(get_config_email())
EmailService::new(get_config_email(), get_depends_redis_client())
}

View File

@@ -0,0 +1,15 @@
use crate::config::config_redis::get_config_redis;
use cached::proc_macro::cached;
#[cached]
pub fn get_depends_redis_client() -> redis::Client {
let config_redis = get_config_redis();
redis::Client::open(
format!(
"redis://{}:{}",
config_redis.redis_url, config_redis.redis_port
)
.as_str(),
)
.unwrap()
}

View File

@@ -1,2 +1,3 @@
pub mod depends_auth_service;
pub mod depends_email_service;
pub mod depends_redis_client;

View File

@@ -6,7 +6,7 @@ use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
pub async fn send_message(
Extension(auth_service): Extension<AuthService>,
Extension(email_service): Extension<EmailService>,
Extension(mut email_service): Extension<EmailService>,
Extension(author): Extension<MessageAuthor>,
Json(mut payload): Json<SendMessage>,
) -> impl IntoResponse {

View File

@@ -8,9 +8,9 @@ mod service;
mod utils;
use crate::config::config_server;
use crate::depends::depends_email_service;
#[tokio::main]
async fn main() {
async fn run_server() {
let server_config = config_server::get_config_server();
let app = route::create_route(server_config.allowed_origins);
@@ -23,3 +23,34 @@ async fn main() {
axum::serve(listener, app).await.unwrap();
}
async fn run_worker() {
let mut email_service = depends_email_service::get_depends_email_service();
loop {
email_service.create_send_message_task().await;
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
async fn run_both() {
futures::future::join(run_server(), run_worker()).await;
}
#[tokio::main]
async fn main() {
let args: Vec<String> = std::env::args().collect();
let runtime: &str;
if args.len() < 2 {
runtime = "both";
} else {
runtime = args[1].as_str();
}
match runtime {
"server" => run_server().await,
"worker" => run_worker().await,
"both" => run_both().await,
_ => panic!("Invalid argument"),
}
}

View File

@@ -1,5 +1,3 @@
use chrono::serde::ts_seconds::deserialize as from_ts;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@@ -18,7 +16,4 @@ pub struct SendMessage {
pub subject: String,
pub message: String,
#[serde(deserialize_with = "from_ts")]
pub timestamp: DateTime<Utc>,
}

View File

@@ -1,32 +1,24 @@
use crate::config::config_auth::ConfigAuth;
use crate::config::config_limits::ConfigLimits;
use crate::config::config_redis::ConfigRedis;
use crate::model::send_message::MessageAuthor;
use redis::aio::MultiplexedConnection;
use redis::{AsyncCommands, ExistenceCheck, SetExpiry, SetOptions};
use reqwest::header::AUTHORIZATION;
use crate::config::config_auth::ConfigAuth;
use crate::config::config_limits::ConfigLimits;
use crate::model::send_message::MessageAuthor;
#[derive(Clone)]
pub struct AuthService {
auth_url: String,
redis: redis::Client,
redis_client: redis::Client,
max_requests: u32,
expiration_time: usize,
}
impl AuthService {
pub fn new(config_auth: ConfigAuth, config_redis: ConfigRedis, limits: ConfigLimits) -> Self {
let client = redis::Client::open(
format!(
"redis://{}:{}",
config_redis.redis_url, config_redis.redis_port
)
.as_str(),
)
.unwrap();
pub fn new(config_auth: ConfigAuth, redis_client: redis::Client, limits: ConfigLimits) -> Self {
AuthService {
redis_client,
auth_url: config_auth.auth_url,
redis: client,
max_requests: limits.max_requests,
expiration_time: limits.expiration_time,
}
@@ -59,7 +51,6 @@ impl AuthService {
}
pub async fn increase_user_request_count(&self, user: &MessageAuthor) -> bool {
let mut con = self.redis.get_async_connection().await.unwrap();
let current_request_key = format!(
"user-message:{}:requests:{}",
user.email,
@@ -71,18 +62,24 @@ impl AuthService {
.conditional_set(ExistenceCheck::NX)
.get(false);
return con
return self
.get_async_connection()
.await
.set_options(&current_request_key, 1, set_options)
.await
.expect("Error setting key");
}
async fn count_user_requests(&self, user: &MessageAuthor) -> u32 {
let mut con = self.redis.get_async_connection().await.unwrap();
let query_user_requests = format!("user-message:{}:requests:*", user.email);
let results: Vec<String>;
match con.keys(query_user_requests).await {
match self
.get_async_connection()
.await
.keys(query_user_requests)
.await
{
Ok(r) => {
results = r;
}
@@ -93,4 +90,11 @@ impl AuthService {
return results.len() as u32;
}
async fn get_async_connection(&self) -> MultiplexedConnection {
self.redis_client
.get_multiplexed_async_connection()
.await
.unwrap()
}
}

View File

@@ -1,71 +1,58 @@
use std::sync::Arc;
use crate::config::config_email::ConfigEmail;
use crate::model::send_message::SendMessage;
use lettre::message::Mailbox;
use lettre::{
transport::smtp::authentication::Credentials, Address, AsyncSmtpTransport, AsyncTransport,
Message, Tokio1Executor,
};
use redis_queue_rs::async_redis_queue::AsyncRedisQueue;
type MessageQueue = deadqueue::unlimited::Queue<SendMessage>;
use crate::config::config_email::ConfigEmail;
use crate::model::send_message::SendMessage;
#[derive(Clone)]
pub struct EmailService {
name: String,
email: String,
mailer: AsyncSmtpTransport<Tokio1Executor>,
message_queue: Arc<MessageQueue>
redis_client: redis::Client,
}
impl EmailService {
pub fn new(config_email: ConfigEmail) -> Self {
let email_service = EmailService {
name: config_email.smtp_name.clone(),
email: config_email.smtp_email.clone(),
mailer: AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&config_email.smtp_server)
pub fn new(config_email: ConfigEmail, redis_client: redis::Client) -> EmailService {
let mailer =
AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(&config_email.smtp_server)
.unwrap()
.credentials(Credentials::new(
config_email.smtp_username,
config_email.smtp_password,
))
.port(config_email.smtp_port)
.build(),
message_queue: Arc::new(MessageQueue::new())
};
email_service.bind_executer();
.build();
email_service
EmailService {
mailer,
redis_client,
name: config_email.smtp_name.clone(),
email: config_email.smtp_email.clone(),
}
}
pub async fn send_email (
&self,
m: SendMessage
) {
self.message_queue.push(m);
pub async fn send_email(&mut self, m: SendMessage) {
self.get_message_queue().await.push(m).await;
}
fn bind_executer(&self) {
let local_self = self.clone();
let message_queue = self.message_queue.clone();
tokio::spawn(async move {
loop {
let message = message_queue.pop().await;
local_self.create_send_message_task(message).await;
}
});
}
pub(crate) async fn create_send_message_task(&mut self) {
let message = self.get_message_queue().await.pop().await;
if message.is_none() {
return;
}
async fn create_send_message_task(&self, m: SendMessage) {
let message = self.message_queue.pop().await;
match self.send_message_smtp(message.clone()).await {
let message = message.unwrap();
let message_author = message.author.clone().unwrap();
match self.send_message_smtp(message).await {
Ok(_) => {
println!(
"Email sent successfully from {} to {}",
message.author.unwrap().email,
self.email
message_author.email, self.email
);
}
Err(e) => {
@@ -105,4 +92,9 @@ impl EmailService {
sender.name, sender.email, self.name, self.email, m.subject, m.message
)
}
pub async fn get_message_queue(&self) -> AsyncRedisQueue<SendMessage> {
let redis_client = self.redis_client.clone();
AsyncRedisQueue::new("message-hideyoshi.com".to_string(), redis_client).await
}
}

View File

@@ -1,6 +1,6 @@
use axum::Router;
use http::Method;
use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::cors::{AllowOrigin, CorsLayer};
pub struct RouterBuilder {
router: Router,