Event-Driven Architecture is not new. It is implemented in many big systems such as commercial website, blockchain crypto currency exchange, game development like unreal engine blueprints etc. It solves many problems related to performance problems as well as asynchronous tasks. In distributed system, talking about implementation especially the cloud computing such as AWS serverless EventBridge, SQS, or micro services. It’s required to use this pattern.
However, if you are Ruby on Rails developer. You might get confused because Ruby on Rails is a full stack framework. It combines frontend and backend in same source and often used for small to medium website. We use background job like sidekiq and RabbitMQ, Amazon SQS as message broker. How do we implement Event-Driven? Well, we already know that in rails we have ActiveRecord callbacks which is like an event mechanism. That’s a sample. Now, let’s see how to implement event handler as the title of this post.
I. Server-Side Implementation
We can implement the handler both server and client side. Server side for crucial business logic and client side for updating UI.
1.1. Callbacks
Callbacks function is rails like triggering an event. When create or update a record we have callbacks to do additional tasks.
class MyModel < ApplicationRecord
before_save :do_something, if: :additional_condition?
private
def additional_condition?
# Define your additional condition here
# For example, check if a specific attribute meets a certain condition
# Replace :attribute_name and 'condition_value' with your actual attribute and condition
self.attribute_name == 'condition_value'
end
def do_something
# Implement your callback logic here
puts "Callback executed!"
end
end
For example, if we want to do tasks after Total updated. We check the Total value by implementing additional condition. This is TotalUpdated event. We simply implement without using message broker. But if the logic is big and we have many event handler for this model, we can’t do like that. Because of the drawback of callbacks. This case, we consider the permanent or periodic background jobs.
1.2 Background Job
Jobs can be triggered in model’s callbacks. But if having massive messages we consider to use message broker such as rabbitmq, aws sqs, eventbridge etc. But should we implement permanent jobs for doing long pooling?
Well, using permanent jobs for long-polling in Ruby on Rails is not a recommended approach due to several reasons:
- Resource Consumption: Long-polling involves keeping connections open for an extended period, which can consume server resources (such as memory and CPU) and impact the scalability of your application. Permanent jobs would keep these resources occupied indefinitely, potentially leading to resource exhaustion and degraded performance.
- Scalability: Permanent jobs may not scale well, especially in scenarios with a large number of concurrent long-polling connections. As the number of connections increases, the server may struggle to handle the load, leading to slow response times or even failures.
- Resource Leakage: Permanent jobs can lead to resource leakage if they are not managed properly. For example, if a permanent job crashes or becomes stuck, it may not release the resources it occupies, leading to a buildup of stale connections and increased memory usage over time.
- Maintenance: Managing permanent jobs requires additional overhead for monitoring, troubleshooting, and maintenance. You need to ensure that the jobs are running correctly, handle errors gracefully, and clean up resources when they are no longer needed.
However, not recommended approach doesn’t mean we don’t do. We can reduce the drawback by using background jobs to perform periodic checks instead of permanent jobs. But several jobs are acceptable, depending on your resources. For example, I’m running three permanent jobs on EC2 2 cpu and 8G ram. They are working fine in production for years. But need to pay attention on your concurrency config. It must be larger than the number of permanent jobs so other job can be performed. We save some slots for other processes. Otherwise, other job can’t run because your permanent jobs are taking places.
1.3 HTTP request
Simply enqueue jobs triggered by http requests. Triggering a long pooling with timeout under background job. Why not?
# app/controllers/data_controller.rb
class DataController < ApplicationController
def long_polling
# Start a background job to perform the long-polling operation
LongPollingJob.perform_later
end
end
# app/jobs/long_polling_job.rb
class LongPollingJob < ApplicationJob
queue_as :default
def perform
# Simulate long-polling logic (e.g., waiting for new data or a timeout)
sleep 10
# Generate some dummy data (replace with your actual data retrieval logic)
data = { message: "New data is available!" }
# Respond to the client with the updated data
ActionCable.server.broadcast("long_polling_channel", data)
end
end
II. Client-Side Implementation
On the client side (e.g., JavaScript), implement long-polling logic to continuously make requests to a specific endpoint on the Rails server.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Long-Polling Example</title>
</head>
<body>
<div id="messages"></div>
<script>
function longPolling() {
// Make a GET request to the server's long-polling endpoint
fetch('/messages/long_polling')
.then(response => response.json())
.then(data => {
// Handle the received data
displayMessage(data.message);
// After processing the response, initiate another long-polling request
longPolling();
})
.catch(error => {
console.error('Error during long-polling:', error);
// Retry long-polling after a delay (optional)
setTimeout(longPolling, 5000); // Retry after 5 seconds
});
}
function displayMessage(message) {
// Display the received message on the webpage
const messagesDiv = document.getElementById('messages');
const messageElement = document.createElement('p');
messageElement.textContent = message;
messagesDiv.appendChild(messageElement);
}
// Start the long-polling process when the page loads
document.addEventListener('DOMContentLoaded', () => {
longPolling();
});
</script>
</body>
</html>
III. Realtime
Use long pooling for real-time tasks for better UI.
- Timeouts: Implement timeouts for long-polling connections to limit the duration of each connection and prevent resource exhaustion. If no new data is available within the timeout period, close the connection and allow the client to reconnect if necessary.
- Background Jobs: Use background jobs to perform periodic checks for new data and notify clients when updates are available. This allows you to control the frequency of data retrieval and reduce the impact on server resources.
- Event-Driven Architecture: Consider adopting an event-driven architecture using technologies like WebSockets or server-sent events (SSE) for real-time communication between clients and servers. These technologies are specifically designed for handling long-lived connections and can scale more effectively than long-polling.
- Third-Party Services: Explore third-party services or managed solutions that specialize in real-time communication and messaging, such as Pusher, Firebase, or AWS IoT. These services provide scalable infrastructure and APIs for building real-time applications without the overhead of managing long-polling connections yourself.
IV. For simple monolith
These solutions are for monolith. Can’t be used in distributed system. But each service is a monolith, isn’t it?
4.1 Model attributes trigger
Using specific rails model for triggering the events instead of long pooling jobs. It’s like we collect values for a record.
When a value updated, an event triggered.
4.2 Status trigger
Consider use aasm gem callbacks
class Job
include AASM
aasm do
state :sleeping, initial: true, before_enter: :do_something
state :running, before_enter: Proc.new { do_something && notify_somebody }
state :finished
after_all_transitions :log_status_change
event :run, after: :notify_somebody do
before do
log('Preparing to run')
end
transitions from: :sleeping, to: :running, after: Proc.new {|*args| set_process(*args) }
transitions from: :running, to: :finished, after: LogRunTime
end
end
end
When call run event by job.run it will do two transitions of state:
- sleeping to running – execute Proc after.
- running to finished – execute Class LogRunTime (has call function) after.
After event finished, it calls notify_somebody method. This usecase is useful for object state changes. We can actually turn any business logic to model for tracking status. For example, tracking your cargo, simulate aws step functions. Imagine our business has step by step. The problem is about the scale ability. Because it sticks with rails model.
In larger distributed system, we can’t do this.
Attention! When I say background job I also mean we can do long pooling in background job with a timeout.
V. AWS SQS
There are two kind of polling: short and long. In contrast to long-polling, which involves waiting for a certain period for messages to become available, short-polling involves making frequent requests to check for messages, even if the queue is empty.
5.1 Receive_messages: it calls API.
Short polling.
response = sqs_client.receive_message(
queue_url: queue_url,
max_number_of_messages: max_number_of_messages
)
Enable long polling by adding wait_time_seconds. The WaitTimeSeconds
parameter determines the duration (in seconds) that the ReceiveMessage
operation should wait for messages to become available in the queue before returning a response to the client. It effectively controls the long-polling behavior of the ReceiveMessage
operation.
resp = sqs.receive_message(queue_url: URL, max_number_of_messages: 10, wait_time_seconds: 10)
Thus, it can do both short and long polling.
5.2 QueuePoller Class
By default, it is long polling and run indefinitely. We can set timeout to stop polling by idle_timeout.
poller = Aws::SQS::QueuePoller.new(receive_queue_url)
poller_stats = poller.poll({
max_number_of_messages: 10,
idle_timeout: 60 # Stop polling after 60 seconds of no more messages available (polls indefinitely by default).
}) do |messages|
messages.each do |message|
puts "Message body: #{message.body}"
end
end
We have two ways to handle messages in SQS.
ReceiveMessage is low level and QueuePoller is high level. I often use QueuePoller helping me to focus on the logic to process the message. QueuePoller automatically handles long-polling, retries, timeouts, and error handling. We have to manually do those things if using ReceiveMessage.