Skip to content

Ingesting and Monitoring Custom Metrics in CloudWatch With AWS Lambda

RabbitMQ is a popular message broker that can send and receive messages between different systems. One use case for RabbitMQ is as an event source for AWS Lambda, allowing Lambda functions to be triggered by letters sent to RabbitMQ queues.

This article will examine how to use Terraform to configure RabbitMQ as an event source for Lambda and view metrics on CloudWatch. We are using Total, Ready, and Unacked metrics of RabbitMQ as custom metrics.

RabbitMQ also provides metrics that can be used to monitor the system’s health. The metrics we are looking into:

    1. Ready Metric indicates the number of messages that are currently ready to be consumed by consumers.

    1. Total The Metric indicates the total number of messages in the queue.

    1. Unacked The metric shows the number of messages that have been delivered to a consumer but has not been acknowledged yet.

By using these metrics in CloudWatch, developers can monitor the health of the RabbitMQ queue and automatically scale the system up or down as needed.

Read more about How to host a static website on AWS S3?

For example, if the Ready metric starts to grow, it may indicate insufficient consumers to handle the load. In this case, an auto-scaling policy can be triggered to add more instances of the consumer component to handle the additional load.

RabbitMQ as an event source for Lambda

 

fig: RabbitMQ as an event source for Lambda

Prerequisite:

    1. AWS account with access to the required AWS services.

    1. You already have Terraform installed on your system.

    1. You already have your RabbitMQ server and queue endpoints.

The endpoints of the queue will be like this: https://ihflktln:***@shark.rmq.cloudamqp.com/api/queues/ihflktln


When a developer configures RabbitMQ as an event source for Lambda, it creates a trigger that monitors a specific message queue. When a message arrives on that queue, Lambda executes the code associated with the trigger automatically.

So, we will ingest Total, Ready, and Unacked metrics to the CloudWatch on QueueMetrics Namespace using a lambda function. Furthermore, the lambda function is as follows:

import requests
import boto3
import os

def put_metrics_to_cloudwatch(total, ready, unacked):
    client = boto3.client('cloudwatch')
    client.put_metric_data(
        Namespace='QueueMetrics',
        MetricData=[
            {
                'MetricName': 'TotalMessages',
                'Value': total,
                'Unit': 'Count'
            },
            {
                'MetricName': 'ReadyMessages',
                'Value': ready,
                'Unit': 'Count'
            },
            {
                'MetricName': 'UnackedMessages',
                'Value': unacked,
                'Unit': 'Count'
            }
        ]
    )  
def lambda_handler(event, context):
    endpoint = os.environ['RABBITMQ_ENDPOINT']
    response = requests.get(endpoint)
    data = response.json()
    print(data)
    total = data[0]['messages']
    ready = data[0]['messages_ready']
    unacked = data[0]['messages_unacknowledged']
    print("Total messages:", total)
    print("Ready messages:", ready)
    print("Unacked messages:", unacked)
    put_metrics_to_cloudwatch(total, ready, unacked)
    return {'total':total, 'ready':ready, 'unacked':unacked}

Terraform to Create and Manage Resources

Once we have our Lambda function and RabbitMQ queue set up, we can use Terraform. To create and manage the resources. The Terraform configuration file will look something like this:

#lambda.tf
locals {
  function_name = var.lambda_function_name != null ? var.lambda_function_name : "RabbitMQ-${var.env}-lamd-metric-ingestor"
  lambda_tags = {
    Name = local.function_name
    Env  = var.env
  }
}
module "lambda_function" {
  source        = "terraform-aws-modules/lambda/aws"
  version       = "3.2.0"
  function_name = local.function_name
  description   = local.function_name
  handler       = "index.lambda_handler"
  runtime       = "python3.8"
  attach_policy_json = true
  policy_json = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "cloudwatch:PutMetricData",
        "events:PutTargets"
      ],
      "Resource": "*"
    }
  ]
}
EOF
  source_path = [
    {
      path             = "./src",
      pip_requirements = true
    }
  ]
  timeout = 60
  publish = true
  tags    = merge(local.lambda_tags, var.tags)

  environment_variables = {
    RABBITMQ_ENDPOINT = var.rabbitmq_endpoint
  }
}
resource "aws_cloudwatch_event_rule" "lambda_function" {
  name        = local.function_name
  description = "Sparrow cron rule fires every minute"
  schedule_expression = "cron(*/1 * * * ? *)"
}

resource "aws_cloudwatch_event_target" "lambda_function" {
  rule = aws_cloudwatch_event_rule.lambda_function.name
  target_id = "lambda_function"
  arn = module.lambda_function.lambda_function_arn
}
resource "aws_lambda_permission" "allow_cw_to_call_lambda_function" {
  statement_id = "AllowExecutionFromCloudWatch"
  action = "lambda:InvokeFunction"
  function_name = module.lambda_function.lambda_function_name
  principal = "events.amazonaws.com"
  source_arn = aws_cloudwatch_event_rule.lambda_function.arn
}

And we have another file called vars.tf, which is used to declare variables in Terraform. These variables pass values into the configuration and provide dynamic values to the resources being created, as shown below.

#vars.tf
variable "env" {
  type        = string
  description = "Env to create resources on"
  default     = "dev"
}
variable "tags" {
  type        = map(string)
  description = "Tags"
  default     = {}
}
variable "lambda_function_name" {
  type        = string
  description = "(optional) Lambda function name"
  default     = null
}
variable "rabbitmq_endpoint" {
  type        = string
  description = "RabbitMQ endpoints"
  default = "https://ihflktln:***@shark.rmq.cloudamqp.com/api/queues/ihflktln"
}

The provider.tf the File is used to configure the providers that Terraform will use to interact with external services, as shown below:

provider "aws" {
  region = "ap-southeast-1"
  access_key = "your_access_key"
  secret_key = "your_secret_key"
}

Then, we added a scheduled CloudWatch event rule that triggers this Lambda every 1 minute, which is set up like in the above Lambda.tf File:

resource "aws_cloudwatch_event_rule" "lambda_function" {
  name        = local.function_name
  description = "Sparrow cron rule fires every minute"
  schedule_expression = "cron(*/1 * * * ? *)"
}

Once all of the Terraform configuration files are created, we can use Terraform command to provision the resources. This can be done by running the terraform apply Command In the command line. Once terraform provisions your infrastructure, you can see on the console you will have your lambda function.

Screenshot of Lambda Function Created by terraform

 

Fig: Lambda Function Created by terraform

And also your RabbitMQ matrices Total, Ready, and unacked are ingested by the lambda function.

Screenshot of AWS CloudWatch, where we can see the ingested metrics by Lambda function on QueueMetrics namespace.

 

Fig: AWS CloudWatch, where we can see the ingested metrics by Lambda function on QueueMetrics namespace.

Screenshot of CloudWatch Custom metrics from our RabbitMQ

 

Fig: CloudWatch Custom metrics from our RabbitMQ

Congratulations!!! You were successful in ingesting all required RabbitMQ metrics into Cloudwatch.

Overall, by configuring RabbitMQ as an event source for Lambda and using metrics in CloudWatch to monitor the health of the queue, developers can create a robust and scalable distributed system that can handle various events and messages.


Read the next part of this blog “AWS Auto scale Instance-Based on RabbitMQ Custom Metrics.

Share this article on:

AWS Auto scale Instance-Based on RabbitMQ Custom Metrics

How to Host Static Websites on AWS S3?

How to Choose the Right Cloud Service Provider?​

Starting With AWS Managed Hosting Services