All posts by Pascal Vogel

Introducing Amazon MQ cross-Region data replication for ActiveMQ brokers

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/introducing-amazon-mq-cross-region-data-replication-for-activemq-brokers/

This post is written by Dominic Gagné, Senior Software Development Engineer, and Vinodh Kannan Sadayamuthu, Senior Solutions Architect

Amazon MQ now supports cross-Region data replication for ActiveMQ brokers. This feature enables you to build regionally resilient messaging applications and makes it easier to set up cross-Region message replication between ActiveMQ brokers in Amazon MQ. This blog post explains how cross-Region data replication works in Amazon MQ, how to setup cross-Region replica brokers for ActiveMQ, and how to test promoting a replica broker.

Amazon MQ is a managed message broker service for Apache ActiveMQ and RabbitMQ that simplifies setting up and operating message brokers on AWS.

Cross-Region replication improves the resilience and disaster recovery capabilities of your systems. This new Amazon MQ feature makes it easier to increase resilience of your ActiveMQ messaging systems across AWS Regions.

How cross-Region data replication works in Amazon MQ for ActiveMQ

The Amazon MQ for ActiveMQ cross-Region data replication feature replicates broker state from the primary broker in one AWS Region to the replica broker in another Region. Broker state consists of messages that have been sent to a broker by a message producer. Additionally, message acknowledgments and transactions are replicated. Scheduled messages and broker XML configuration are not replicated from the primary to the replica broker.

State replication occurs asynchronously and runs in the background. When a message is sent to a cross-Region data replication enabled broker, the data is persisted both to the primary data store and also on a queue used to replicate data. The replica broker acts as a client of this queue and consumes data that represents broker state from the primary broker.

At any given moment, only the primary broker is available for client connections. The replica broker is a hot standby and passively replicates the primary broker’s state. However, it does not accept client connections. The following diagram shows a simplified version of a cross-Region data replication broker pair. All replication traffic is encrypted using TLS and remains within AWS’ private backbone.

Amazon MQ for ActiveMQ cross-region data replication architecture

Configuring cross-Region replica brokers for Amazon MQ for ActiveMQ

To set up a cross-Region replica broker, your Amazon MQ for ActiveMQ primary broker must meet the following eligibility criteria:

  • ActiveMQ version 5.17.6 or above
  • Instance size m5.large or higher
  • Active/standby broker deployment enabled
  • Be in the Running state

If you do not have an ActiveMQ broker that meets these criteria, see Creating and configuring an ActiveMQ broker for instructions on how to create a primary broker.

To configure cross-Region replication

  1. Navigate to the Amazon MQ console and choose Create replica broker.
    Amazon MQ console create replica broker
  2. Select a primary broker from the list of eligible primary brokers and choose Next.
    Amazon MQ console choose primary broker
  3. Under Replica broker details, select the Region for your replica broker and enter a Replica broker name.
    Amazon MQ console configure replica broker
  4. In the ActiveMQ console user for replica broker panel, enter a Username and Password for broker access.
    Amazon MQ console user for replica broker
  5. In the Data replication user to bridge access between brokers panel, enter a replication user Username and Password.
    Amazon MQ console user for replica broker
  6. In the Additional settings panel, keep the defaults and choose Next.
  7. Review the settings and choose Create replica broker.
    Note: The broker access type is automatically set based on the primary broker access type.
    Amazon MQ console create replica broker setting summary
  8. The creation process takes up to 25 minutes. Once the replica broker creation is complete, begin replication between the primary and the replica brokers by rebooting the primary broker.
  9. Once the primary broker is rebooted and its status is Running, you can see the replica details in the Data replication panel of the primary broker.
    Amazon MQ console broker replication details

Both brokers now synchronize with each other to establish an inter-Region network and connection through which broker state is replicated. Once both brokers are in the Running state, the primary broker accepts client connections and passes all broker state changes (messages, acknowledgments, transactions, etc.) to the replica broker.

The replica broker now asynchronously mirrors the state of the primary broker. However, it does not become available for client connections until it is promoted via a switchover or a failover. These operations are covered in the following section.

Testing data replication and promoting the replica broker

There are two ways to promote a replica broker: initiating a switchover or a failover.

Switchover Failover
  • Prioritizes consistency over availability.
  • Prioritizes availability over consistency.
  • Brokers are guaranteed to have identical states.
  • Brokers are not guaranteed to be in identical states.
  • Brokers may not be available immediately to serve client traffic.
  • Replica broker is immediately available to serve client traffic.

To initiate a failover or switchover

    1. Navigate to the Amazon MQ console, choose your primary broker, and log in to the ActiveMQ Web Console using the URLs located in the Connections panel.
    2. In the top menu, select Queues. You should be able to see four ActiveMQ.Plugin.Replication queues used by the replication feature.
      Active MQ console queues
    3. To test message replication from the primary to a replica broker, create a queue and send messages. To create the queue:
      • For Queue Name, enter TestQueue.
      • Choose Create.

      ActiveMQ console create queue

    4. Under Operations for the TestQueue, choose Send To and perform the following steps:
      • For Number of messages to send, enter 10 and keep the other defaults.
      • Under Message body, enter a test message.
      • Choose Send.

      ActiveMQ console send test message

    5. To promote the replica broker, navigate to the Amazon MQ console and change the Region to the AWS Region where the replica broker is located.
    6. Select the replica broker (in this example called Secondarybroker) and choose Promote replica.
      Amazon MQ console promote broker
    7. In the Promote replica broker pop-up window:
      • Select Failover or Switchover.
      • Enter confirm in text box.
      • Choose Confirm.

      Amazon MQ console confirm broker promotion

    8. While a replica broker is being promoted, its replication status changes to Promotion in progress. The corresponding primary broker’s replication status changes to Demotion in progress.

Replica Secondarybroker status – Promotion in progress:

Replica Secondarybroker status - Promotion in progress

Primary broker status – Demotion in progress:

Primary broker status - Demotion in progress

Secondarybroker status – Promoted to new primary broker:

Secondarybroker status – Promoted to new primary broker

  1. Once the Secondarybroker status is Running, log in to the ActiveMQ Web Console from the URLs located in the Connections panel. You can see the replicated messages sent from the former primary broker in Step 4 in the TestQueue:
    Replicated message from primary broker in TestQueue

Monitoring cross-Region data replication

To monitor cross-Region data replication progress, you can use the Amazon CloudWatch metrics TotalReplicationLag and ReplicationLag.

Amazon CloudWatch metrics TotalReplicationLag and ReplicationLag

You can use these two metrics to monitor the progress of a switchover. When their value reaches zero, the switchover will complete because the broker states have been synchronized and the replica broker begins accepting client connections. If the switchover does not progress fast enough, or if you need the replica broker to be immediately available to serve client traffic, you can request a failover at any time.

Note: A failover can interrupt an ongoing switchover. However, a switchover cannot interrupt an ongoing failover.

Issuing a failover request causes the replica broker to become immediately available, but does not provide any guarantees about what data has been replicated to the replica broker. This means that a failover can make data tracking and reconciliation more challenging for your client application than a switchover.

For this reason, we recommend that you always start with a switchover and interrupt it with a failover if necessary. To interrupt an ongoing switchover, follow the same steps as for promoting a replica broker, select the failover option, and confirm.

Note: If you fail back to the original primary broker, messages that are not replicated from the primary to the replica broker during the failover will still exist on the primary broker. Therefore, consumers must manage these messages. We recommend tracking the processed message IDs in a data store such as Amazon DynamoDB global tables and comparing the message to the processed message IDs.

If you no longer need to replicate broker data across Regions or if you need to delete a primary or replica broker, you must unpair the replica broker and reboot the primary broker. You can unpair the replica broker in the Amazon MQ console by following Delete a CRDR broker.

To unpair the broker using the AWS Command Line Interface (AWS CLI), run the following command, replacing the --broker-id with your primary broker ID:

aws mq update-broker --broker-id <primary broker ID> \
--data-replication-mode "NONE" \
--region us-east-1

Conclusion

Using the cross-Region data replication feature for Amazon MQ for ActiveMQ provides a straightforward way to implement cross-Region replication to improve the resilience of your architecture and meet your business continuity and disaster recovery requirements. This post explains how cross-Region data replication works in Amazon MQ, how to set up a cross-Region replica broker, and how to test and promote the replica broker.

For more details, see the Amazon MQ documentation.

For more serverless learning resources, visit Serverless Land.

Node.js 20.x runtime now available in AWS Lambda

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/node-js-20-x-runtime-now-available-in-aws-lambda/

This post is written by Pascal Vogel, Solutions Architect, and Andrea Amorosi, Senior Solutions Architect.

You can now develop AWS Lambda functions using the Node.js 20 runtime. This Node.js version is in active LTS status and ready for general use. To use this new version, specify a runtime parameter value of nodejs20.x when creating or updating functions or by using the appropriate container base image.

You can use Node.js 20 with Powertools for AWS Lambda (TypeScript), a developer toolkit to implement serverless best practices and increase developer velocity. Powertools for AWS Lambda includes proven libraries to support common patterns such as observability, Parameter Store integration, idempotency, batch processing, and more.

You can also use Node.js 20 with Lambda@Edge, allowing you to customize low-latency content delivered through Amazon CloudFront.

This blog post highlights important changes to the Node.js runtime, notable Node.js language updates, and how you can use the new Node.js 20 runtime in your serverless applications.

Node.js 20 runtime updates

Changes to Root CA certificate loading

By default, Node.js includes root certificate authority (CA) certificates from well-known certificate providers. Earlier Lambda Node.js runtimes up to Node.js 18 augmented these certificates with Amazon-specific CA certificates, making it easier to create functions accessing other AWS services. For example, it included the Amazon RDS certificates necessary for validating the server identity certificate installed on your Amazon RDS database.

However, loading these additional certificates has a performance impact during cold start. Starting with Node.js 20, Lambda no longer loads these additional CA certificates by default. The Node.js 20 runtime contains a certificate file with all Amazon CA certificates located at /var/runtime/ca-cert.pem. By setting the NODE_EXTRA_CA_CERTS environment variable to /var/runtime/ca-cert.pem, you can restore the behavior from Node.js 18 and earlier runtimes.

This causes Node.js to validate and load all Amazon CA certificates during a cold start. It can take longer compared to loading only specific certificates. For the best performance, we recommend bundling only the certificates that you need with your deployment package and loading them via NODE_EXTRA_CA_CERTS. The certificates file should consist of one or more trusted root or intermediate CA certificates in PEM format.

For example, for RDS, include the required certificates alongside your code as certificates/rds.pem and then load it as follows:

NODE_EXTRA_CA_CERTS=/var/task/certificates/rds.pem

See Using Lambda environment variables in the AWS Lambda Developer Guide for detailed instructions for setting environment variables.

Amazon Linux 2023

The Node.js 20 runtime is based on the provided.al2023 runtime. The provided.al2023 runtime in turn is based on the Amazon Linux 2023 minimal container image release and brings several improvements over Amazon Linux 2 (AL2).

provided.al2023 contains only the essential components necessary to install other packages and offers a smaller deployment footprint with a compressed image size of less than 40MB compared to the over 100MB AL2-based base image.

With glibc version 2.34, customers have access to a more recent version of glibc, updated from version 2.26 in AL2-based images.

The Amazon Linux 2023 minimal image uses microdnf as package manager, symlinked as dnf, replacing yum in AL2-based images. Additionally, curl and gnupg2 are also included as their minimal versions curl-minimal and gnupg2-minimal.

Learn more about the provided.al2023 runtime in the blog post Introducing the Amazon Linux 2023 runtime for AWS Lambda and the Amazon Linux 2023 launch blog post.

Runtime Interface Client

The Node.js 20 runtime uses the open source AWS Lambda NodeJS Runtime Interface Client (RIC). You can now use the same RIC version in your Open Container Initiative (OCI) Lambda container images as the one used by the managed Node.js 20 runtime.

The Node.js 20 runtime supports Lambda response streaming which enables you to send response payload data to callers as it becomes available. Response streaming can improve application performance by reducing time-to-first-byte, can indicate progress during long-running tasks, and allows you to build functions that return payloads larger than 6MB, which is the Lambda limit for buffered responses.

Setting Node.js heap memory size

Node.js allows you to configure the heap size of the v8 engine via the --max-old-space-size and --max-semi-space-size options. By default, Lambda overrides the Node.js default values with values derived from the memory size configured for the function. If you need control over your runtime’s memory allocation, you can now set both of these options using the NODE_OPTIONS environment variable, without needing an exec wrapper script. See Using Lambda environment variables in the AWS Lambda Developer Guide for details.

Use the --max-old-space-size option to set the max memory size of V8’s old memory section, and the --max-semi-space-size option to set the maximum semispace size for V8’s garbage collector. See the Node.js documentation for more details on these options.

Node.js 20 language updates

Language features

With this release, Lambda customers can take advantage of new Node.js 20 language features, including:

  • HTTP(S)/1.1 default keepAlive: Node.js now sets keepAlive to true by default. Any outgoing HTTPs connections use HTTP 1.1 keep-alive with a default waiting window of 5 seconds. This can deliver improved throughput as connections are reused by default.
  • Fetch API is enabled by default: The global Node.js Fetch API is enabled by default. However, it is still an experimental module.
  • Faster URL parsing: Node.js 20 comes with the Ada 2.0 URL parser which brings performance improvements to URL parsing. This has also been back-ported to Node.js 18.7.0.
  • Web Crypto API now stable: The Node.js implementation of the standard Web Crypto API has been marked as stable. You can access the provided cryptographic primitives through globalThis.crypto.
  • Web assembly support: Node.js 20 enables the experimental WebAssembly System Interface (WASI) API by default without the need to set an experimental flag.

For a detailed overview of Node.js 20 language features, see the Node.js 20 release blog post and the Node.js 20 changelog.

Performance considerations

Node.js 19.3 introduced a change that impacts how non-essential modules are lazy-loaded during the Node.js process startup. In terms of the impact to your Lambda functions, this reduces the work during initialization of each execution environment, then if used, the modules will instead be loaded during the first function invoke. This change remains in Node.js 20.

Builders should continue to measure and test function performance and optimize function code and configuration for any impact. To learn more about how to optimize Node.js performance in Lambda, see Performance optimization in the Lambda Operator Guide, and our blog post Optimizing Node.js dependencies in AWS Lambda.

Migration from earlier Node.js runtimes

Migration from Node.js 16

Lambda occasionally delays deprecation of a Lambda runtime for a limited period beyond the end of support date of the language version that the runtime supports. During this period, Lambda only applies security patches to the runtime OS. Lambda doesn’t apply security patches to programming language runtimes after they reach their end of support date.

In the case of Node.js 16, we have delayed deprecation from the community end of support date on September 11, 2023, to June 12, 2024. This gives customers the opportunity to migrate directly from Node.js 16 to Node.js 20, skipping Node.js 18.

AWS SDK for JavaScript

Up until Node.js 16, Lambda’s Node.js runtimes included the AWS SDK for JavaScript version 2. This has since been superseded by the AWS SDK for JavaScript version 3, which was released in December 2020. Starting with Node.js 18, and continuing with Node.js 20, the Lambda Node.js runtimes have upgraded the version of the AWS SDK for JavaScript included in the runtime from v2 to v3. Customers upgrading from Node.js 16 or earlier runtimes who are using the included AWS SDK for JavaScript v2 should upgrade their code to use the v3 SDK.

For optimal performance, and to have full control over your code dependencies, we recommend bundling and minifying the AWS SDK in your deployment package, rather than using the SDK included in the runtime. For more information, see Optimizing Node.js dependencies in AWS Lambda.

Using the Node.js 20 runtime in AWS Lambda

AWS Management Console

To use the Node.js 20 runtime to develop your Lambda functions, specify a runtime parameter value Node.js 20.x when creating or updating a function. The Node.js 20 runtime version is now available in the Runtime dropdown on the Create function page in the AWS Lambda console:

Select Node.js 20.x when creating a new AWS Lambda function in the AWS Management Console

To update an existing Lambda function to Node.js 20, navigate to the function in the Lambda console, then choose Edit in the Runtime settings panel. The new version of Node.js is available in the Runtime dropdown:

Select Node.js 20.x when updating an existing AWS Lambda function in the AWS Management Console

AWS Lambda – Container Image

Change the Node.js base image version by modifying the FROM statement in your Dockerfile:


FROM public.ecr.aws/lambda/nodejs:20
# Copy function code
COPY lambda_handler.xx ${LAMBDA_TASK_ROOT}

Customers running Node.js 20 Docker images locally, including customers using AWS SAM, will need to upgrade their Docker install to version 20.10.10 or later.

AWS Serverless Application Model (AWS SAM)

In AWS SAM, set the Runtime attribute to node20.x to use this version:


AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31

Resources:
  MyFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: lambda_function.lambda_handler
      Runtime: nodejs20.x
      CodeUri: my_function/.
      Description: My Node.js Lambda Function

AWS Cloud Development Kit (AWS CDK)

In the AWS CDK, set the runtime attribute to Runtime.NODEJS_20_X to use this version:


import * as cdk from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as path from "path";
import { Construct } from "constructs";

export class CdkStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // The code that defines your stack goes here

    // The Node.js 20 enabled Lambda Function
    const lambdaFunction = new lambda.Function(this, "node20LambdaFunction", {
      runtime: lambda.Runtime.NODEJS_20_X,
      code: lambda.Code.fromAsset(path.join(__dirname, "/../lambda")),
      handler: "index.handler",
    });
  }
}

Conclusion

Lambda now supports Node.js 20. This release uses the Amazon Linux 2023 OS, supports configurable CA certificate loading for faster cold starts, as well as other improvements detailed in this blog post.

You can build and deploy functions using the Node.js 20 runtime using the AWS Management Console, AWS CLI, AWS SDK, AWS SAM, AWS CDK, or your choice of Infrastructure as Code (IaC). You can also use the Node.js 20 container base image if you prefer to build and deploy your functions using container images.

The Node.js 20 runtime empowers developers to build more efficient, powerful, and scalable serverless applications. Try the Node.js runtime in Lambda today and read about the Node.js programming model in the Lambda documentation to learn more about writing functions in Node.js 20.

For more serverless learning resources, visit Serverless Land.

Converting Apache Kafka events from Avro to JSON using EventBridge Pipes

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/converting-apache-kafka-events-from-avro-to-json-using-eventbridge-pipes/

This post is written by Pascal Vogel, Solutions Architect, and Philipp Klose, Global Solutions Architect.

Event streaming with Apache Kafka has become an important element of modern data-oriented and event-driven architectures (EDAs), unlocking use cases such as real-time analytics of user behavior, anomaly and fraud detection, and Internet of Things event processing. Stream producers and consumers in Kafka often use schema registries to ensure that all components follow agreed-upon event structures when sending (serializing) and processing (deserializing) events to avoid application bugs and crashes.

A common schema format in Kafka is Apache Avro, which supports rich data structures in a compact binary format. To integrate Kafka with other AWS and third-party services more easily, AWS offers Amazon EventBridge Pipes, a serverless point-to-point integration service. However, many downstream services expect JSON-encoded events, requiring custom, and repetitive schema validation and conversion logic from Avro to JSON in each downstream service.

This blog post shows how to reliably consume, validate, convert, and send Avro events from Kafka to AWS and third-party services using EventBridge Pipes, allowing you to reduce custom deserialization logic in downstream services. You can also use EventBridge event buses as targets in Pipes to filter and distribute events from Pipes to multiple targets, including cross-account and cross-Region delivery.

This blog describes two scenarios:

  1. Using Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS Glue Schema Registry.
  2. Using Confluent Cloud and the Confluent Schema Registry.

See the associated GitHub repositories for Glue Schema Registry or Confluent Schema Registry for full source code and detailed deployment instructions.

Kafka event streaming and schema validation on AWS

To build event streaming applications with Kafka on AWS, you can use Amazon MSK, offerings such as Confluent Cloud, or self-hosted Kafka on Amazon Elastic Compute Cloud (Amazon EC2) instances.

To avoid common issues in event streaming and event-driven architectures, such as data inconsistencies and incompatibilities, it is a recommended practice to define and share event schemas between event producers and consumers. In Kafka, schema registries are used to manage, evolve, and enforce schemas for event producers and consumers. The AWS Glue Schema Registry provides a central location to discover, manage, and evolve schemas. In the case of Confluent Cloud, the Confluent Schema Registry serves the same role. Both the Glue Schema Registry and the Confluent Schema Registry support common schema formats such as Avro, Protobuf, and JSON.

To integrate Kafka with AWS services, third-party services, and your own applications, you can use EventBridge Pipes. EventBridge Pipes helps you create point-to-point integrations between event sources and targets with optional filtering, transformation, and enrichment. EventBridge Pipes reduces the amount of integration code that you have to write and maintain when building EDAs.

Many AWS and third-party services expect JSON-encoded payloads (events) as input, meaning they cannot directly consume Avro or Protobuf payloads. To replace repetitive Avro-to-JSON validation and conversion logic in each consumer, you can use the EventBridge Pipes enrichment step. This solution uses an AWS Lambda function in the enrichment step to deserialize and validate Kafka events with a schema registry, including error handling with dead-letter queues, and convert events to JSON before passing them to downstream services.

Solution overview

Architecture overview of the solution

The solution presented in this blog post consists of the following key elements:

  1. The source of the pipe is a Kafka cluster deployed using MSK or Confluent Cloud. EventBridge Pipes reads events from the Kafka stream in batches and sends them to the enrichment function (see here for an example event).
  2. The enrichment step (Lambda function) deserializes and validates the events against the configured schema registry (Glue or Confluent), converts events from Avro to JSON with integrated error handling, and returns them to the pipe.
  3. The target of this example solution is an EventBridge custom event bus that is invoked by EventBridge Pipes with JSON-encoded events returned by the enrichment Lambda function. EventBridge Pipes supports a variety of other targets, including Lambda, AWS Step Functions, Amazon API Gateway, API destinations, and more, enabling you to build EDAs without writing integration code.
  4. In this sample solution, the event bus sends all events to Amazon CloudWatch Logs via an EventBridge rule. You can extend the example to invoke additional EventBridge targets.

Optionally, you can add OpenAPI 3 or JSONSchema Draft 4 schemas for your events in the EventBridge schema registry by either manually generating it from the Avro schema or using EventBridge schema discovery. This allows you to download code bindings for the JSON-converted events for various programming languages, such as JavaScript, Python, and Java, to correctly use them in your EventBridge targets.

The remainder of this blog post describes this solution for the Glue and Confluent schema registries with code examples.

EventBridge Pipes with the Glue Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Glue Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

You need an Amazon MSK serverless cluster running and the Glue Schema registry configured. This example includes a Avro schema and a Glue Schema Registry. See the following AWS blog post for an introduction to schema validation with the Glue Schema Registry: Validate, evolve, and control schemas in Amazon MSK and Amazon Kinesis Data Streams with AWS Glue Schema Registry.

EventBridge Pipes configuration

Use the AWS Cloud Development Kit (AWS CDK) template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Amazon MSK Serverless Kafka topic as the source via AWS Identity and Access Management (IAM) authentication.
  2. EventBridge Pipes reads events from your Kafka topic using the Amazon MSK source type.
  3. An enrichment Lambda function in Java to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An Amazon Simple Queue Service (Amazon SQS) dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule sends all incoming events into a CloudWatch Logs log group.

For MSK-based sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

EventBridge Pipes with the Confluent Schema Registry

This section describes how to implement event schema validation and conversion from Avro to JSON using EventBridge Pipes and the Confluent Schema Registry. You can find the source code and detailed deployment instructions on GitHub.

Prerequisites

To set up this solution, you need a Kafka stream running on Confluent Cloud as well as the Confluent Schema Registry set up. See the corresponding Schema Registry tutorial for Confluent Cloud to set up a schema registry for your Confluent Kafka stream.

To connect to your Confluent Cloud Kafka cluster, you need an API key for Confluent Cloud and Confluent Schema Registry. AWS Secrets Manager is used to securely store your Confluent secrets.

EventBridge Pipes configuration

Use the AWS CDK template provided in the GitHub repository to deploy:

  1. An EventBridge pipe that connects to your existing Confluent Kafka topic as the source via an API secret stored in Secrets Manager.
  2. EventBridge Pipes reads events from your Confluent Kafka topic using the self-managed Apache Kafka stream source type, which includes all non-MSK Kafka clusters.
  3. An enrichment Lambda function in Python to perform event deserialization, validation, and conversion from Avro to JSON.
  4. An SQS dead letter queue to hold events for which deserialization failed.
  5. An EventBridge custom event bus as the pipe target. An EventBridge rule writes all incoming events into a CloudWatch Logs log group.

For self-managed Kafka sources, EventBridge supports configuration parameters, such as batch window, batch size, and starting position, which you can set using the parameters of the CfnPipe class in the example CDK stack.

The example EventBridge pipe consumes events from Kafka in batches of 10 because it is targeting an EventBridge event bus, which has a max batch size of 10. See batching and concurrency in the EventBridge Pipes User Guide to choose an optimal configuration for other targets.

Enrichment Lambda functions

Both of the solutions described previously include an enrichment Lambda function for schema validation and conversion from Avro to JSON.

The Java Lambda function integrates with the Glue Schema Registry using the AWS Glue Schema Registry Library. The Python Lambda function integrates with the Confluent Schema Registry using the confluent-kafka library and uses Powertools for AWS Lambda (Python) to implement Serverless best practices such as logging and tracing.

The enrichment Lambda functions perform the following tasks:

  1. In the events polled from the Kafka stream by the EventBridge pipe, the key and value of the event are base64 encoded. Therefore, for each event in the batch passed to the function, the key and the value are decoded.
  2. The event key is assumed to be serialized by the producer as a string type.
  3. The event value is deserialized using the Glue Schema registry Serde (Java) or the confluent-kafka AvroDeserializer (Python).
  4. The function then returns the successfully converted JSON events to the EventBridge pipe, which then invokes the target for each of them.
  5. Events for which Avro deserialization failed are sent to the SQS dead letter queue.

Conclusion

This blog post shows how to implement event consumption, Avro schema validation, and conversion to JSON using Amazon EventBridge Pipes, Glue Schema Registry, and Confluent Schema Registry.

The source code for the presented example is available in the AWS Samples GitHub repository for Glue Schema Registry and Confluent Schema Registry. For more patterns, visit the Serverless Patterns Collection.

For more serverless learning resources, visit Serverless Land.

Building a serverless document chat with AWS Lambda and Amazon Bedrock

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/building-a-serverless-document-chat-with-aws-lambda-and-amazon-bedrock/

This post is written by Pascal Vogel, Solutions Architect, and Martin Sakowski, Senior Solutions Architect.

Large language models (LLMs) are proving to be highly effective at solving general-purpose tasks such as text generation, analysis and summarization, translation, and much more. Because they are trained on large datasets, they can use a broad generalist knowledge base. However, as training takes place offline and uses publicly available data, their ability to access specialized, private, and up-to-date knowledge is limited.

One way to improve LLM knowledge in a specific domain is fine-tuning them on domain-specific datasets. However, this is time and resource intensive, requires specialized knowledge, and may not be appropriate for some tasks. For example, fine-tuning won’t allow an LLM to access information with daily accuracy.

To address these shortcomings, Retrieval Augmented Generation (RAG) is proving to be an effective approach. With RAG, data external to the LLM is used to augment prompts by adding relevant retrieved data in the context. This allows for integrating disparate data sources and the complete separation of data sources from the machine learning model entirely.

Tools such as LangChain or LlamaIndex are gaining popularity because of their ability to flexibly integrate with a variety of data sources such as (vector) databases, search engines, and current public data sources.

In the context of LLMs, semantic search is an effective search approach, as it considers the context and intent of user-provided prompts as opposed to a traditional literal search. Semantic search relies on word embeddings, which represent words, sentences, or documents as vectors. Consequently, documents must be transformed into embeddings using an embedding model as the basis for semantic search. Because this embedding process only needs to happen when a document is first ingested or updated, it’s a great fit for event-driven compute with AWS Lambda.

This blog post presents a solution that allows you to ask natural language questions of any PDF document you upload. It combines the text generation and analysis capabilities of an LLM with a vector search on the document content. The solution uses serverless services such as AWS Lambda to run LangChain and Amazon DynamoDB for conversational memory.

Amazon Bedrock is used to provide serverless access to foundational models such as Amazon Titan and models developed by leading AI startups, such as AI21 Labs, Anthropic, and Cohere. See the GitHub repository for a full list of available LLMs and deployment instructions.

You learn how the solution works, what design choices were made, and how you can use it as a blueprint to build your own custom serverless solutions based on LangChain that go beyond prompting individual documents. The solution code and deployment instructions are available on GitHub.

Solution overview

Let’s look at how the solution works at a high level before diving deeper into specific elements and the AWS services used in the following sections. The following diagram provides a simplified view of the solution architecture and highlights key elements:

The process of interacting with the web application looks like this:

  1. A user uploads a PDF document into an Amazon Simple Storage Service (Amazon S3) bucket through a static web application frontend.
  2. This upload triggers a metadata extraction and document embedding process. The process converts the text in the document into vectors. The vectors are loaded into a vector index and stored in S3 for later use.
  3. When a user chats with a PDF document and sends a prompt to the backend, a Lambda function retrieves the index from S3 and searches for information related to the prompt.
  4. An LLM then uses the results of this vector search, previous messages in the conversation, and its general-purpose capabilities to formulate a response to the user.

As can be seen on the following screenshot, the web application deployed as part of the solution allows you to upload documents and list uploaded documents and their associated metadata, such as number of pages, file size, and upload date. The document status indicates if a document is successfully uploaded, is being processed, or is ready for a conversation.

Web application document list view

By clicking on one of the processed documents, you can access a chat interface, which allows you to send prompts to the backend. It is possible to have multiple independent conversations with each document with separate message history.

Web application chat view

Embedding documents

Solution architecture diagram excerpt: embedding documents

When a new document is uploaded to the S3 bucket, an S3 event notification triggers a Lambda function that extracts metadata, such as file size and number of pages, from the PDF file and stores it in a DynamoDB table. Once the extraction is complete, a message containing the document location is placed on an Amazon Simple Queue Service (Amazon SQS) queue. Another Lambda function polls this queue using Lambda event source mapping. Applying the decouple messaging pattern to the metadata extraction and document embedding functions ensures loose coupling and protects the more compute-intensive downstream embedding function.

The embedding function loads the PDF file from S3 and uses a text embedding model to generate a vector representation of the contained text. LangChain integrates with text embedding models for a variety of LLM providers. The resulting vector representation of the text is loaded into a FAISS index. FAISS is an open source vector store that can run inside the Lambda function memory using the faiss-cpu Python package. Finally, a dump of this FAISS index is stored in the S3 bucket besides the original PDF document.

Generating responses

Solution architecture diagram excerpt: generating responses

When a prompt for a specific document is submitted via the Amazon API Gateway REST API endpoint, it is proxied to a Lambda function that:

  1. Loads the FAISS index dump of the corresponding PDF file from S3 and into function memory.
  2. Performs a similarity search of the FAISS vector store based on the prompt.
  3. If available, retrieves a record of previous messages in the same conversation via the DynamoDBChatMessageHistory integration. This integration can store message history in DynamoDB. Each conversation is identified by a unique ID.
  4. Finally, a LangChain ConversationalRetrievalChain passes the combination of the prompt submitted by the user, the result of the vector search, and the message history to an LLM to generate a response.

Web application and file uploads

Solution architecture diagram excerpt: web application

A static web application serves as the frontend for this solution. It’s built with React, TypeScriptVite, and TailwindCSS and deployed via AWS Amplify Hosting, a fully managed CI/CD and hosting service for fast, secure, and reliable static and server-side rendered applications. To protect the application from unauthorized access, it integrates with an Amazon Cognito user pool. The API Gateway uses an Amazon Cognito authorizer to authenticate requests.

Users upload PDF files directly to the S3 bucket using S3 presigned URLs obtained via the REST API. Several Lambda functions implement API endpoints used to create, read, and update document metadata in a DynamoDB table.

Extending and adapting the solution

The solution provided serves as a blueprint that can be enhanced and extended to develop your own use cases based on LLMs. For example, you can extend the solution so that users can ask questions across multiple PDF documents or other types of data sources. LangChain makes it easy to load different types of data into vector stores, which you can then use for semantic search.

Once your use case involves searching across multiple documents, consider moving from loading vectors into memory with FAISS to a dedicated vector database. There are several options for vector databases on AWS. One serverless option is Amazon Aurora Serverless v2 with the pgvector extension for PostgreSQL. Alternatively, vector databases developed by AWS Partners such as Pinecone or MongoDB Atlas Vector Search can be integrated with LangChain. Besides vector search, LangChain also integrates with traditional external data sources, such as the enterprise search service Amazon Kendra, Amazon OpenSearch, and many other data sources.

The solution presented in this blog post uses similarity search to find information in the vector database that closely matches the user-supplied prompt. While this works well in the presented use case, you can also use other approaches, such as maximal marginal relevance, to find the most relevant information to provide to the LLM. When searching across many documents and receiving many results, techniques such as MapReduce can improve the quality of the LLM responses.

Depending on your use case, you may also want to select a different LLM to achieve an ideal balance between quality of results and cost. Amazon Bedrock is a fully managed service that makes foundational models (FMs) from leading AI startups and Amazon available via an API, so you can choose from a wide range of FMs to find the model that’s best suited for your use case. You can use models such as Amazon Titan, Jurassic-2 from AI21 Labs, or Anthropic Claude.

To further optimize the user experience of your generative AI application, consider streaming LLM responses to your frontend in real-time using Lambda response streaming and implementing real-time data updates using AWS AppSync subscriptions or Amazon API Gateway WebSocket APIs.

Conclusion

AWS serverless services make it easier to focus on building generative AI applications by providing automatic scaling, built-in high availability, and a pay-for-use billing model. Event-driven compute with AWS Lambda is a good fit for compute-intensive, on-demand tasks such as document embedding and flexible LLM orchestration.

The solution in this blog post combines the capabilities of LLMs and semantic search to answer natural language questions directed at PDF documents. It serves as a blueprint that can be extended and adapted to fit further generative AI use cases.

Deploy the solution by following the instructions in the associated GitHub repository.

For more serverless learning resources, visit Serverless Land.

Implementing idempotent AWS Lambda functions with Powertools for AWS Lambda (TypeScript)

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/implementing-idempotent-aws-lambda-functions-with-powertools-for-aws-lambda-typescript/

This post is written by Alexander Schüren, Sr Specialist SA, Powertools.

One of the design principles of AWS Lambda is to “develop for retries and failures”. If your function fails, the Lambda service will retry and invoke your function again with the same event payload. Therefore, when your function performs tasks such as processing orders or making reservations, it is necessary for your Lambda function to handle requests idempotently to avoid duplicate payment or order processing, which can result in a poor customer experience.

This article explains what idempotency is and how to make your Lambda functions idempotent using the idempotency utility for Powertools for AWS Lambda (TypeScript). The Powertools idempotency utility for TypeScript was co-developed with Vanguard and is now generally available.

Understanding idempotency

Idempotency is the property of an operation that can be applied multiple times without changing the result beyond the initial execution. You can safely run an idempotent operation multiple times without side effects, such as duplicate records or data inconsistencies. This is especially relevant for payment and order processing or third-party API integrations.

There are key concepts to consider when implementing idempotency in AWS Lambda. For each invocation, you specify which subset of the event payload you want to use to identify an idempotent request. This is called the idempotency key. This key can be a single field such as transactionId, a combination of multiple fields such as customerId and requestId, or the entire event payload.

Because timestamps, dates, and other generated values within the payload affect the idempotency key, we recommend that you define specific fields rather than using the entire event payload.

By evaluating the idempotency key, you can then decide if the function needs to run again or send an existing response to the client. To do this, you need to store the following information for each request in a persistence layer (i.e., Amazon DynamoDB):

  • Status: IN_PROGRESS, EXPIRED, COMPLETE
  • Response data: the response to send back to the client instead of executing the function again
  • Expiration timestamp: when the idempotency record becomes invalid for reuse

The following diagram shows a successful request flow for this idempotency scenario:

Request flow for idempotent Lambda function

When you invoke a Lambda function with a particular event for the first time, it stores a record with a unique idempotency key tied to an event payload in the persistence layer.

The function then executes its code and updates the record in the persistence layer with the function response. For subsequent invocations with the same payload, you must check if the idempotency key exists in the persistence layer. If it exists, the function returns the same response to the client. This prevents multiple invocations of the function, making it idempotent.

There are more edge cases to be mindful of, such as when the idempotency record has expired, or handling of failures between the client, the Lambda function, and the persistence layer. The Powertools for AWS Lambda (TypeScript) documentation covers all request flows in detail.

Idempotency with Powertools for AWS Lambda (TypeScript)

Powertools for AWS Lambda, available in PythonJava, .NET, and TypeScript, provides utilities for Lambda functions to ease the adoption of best practices and to reduce the amount of code needed to perform recurring tasks. In particular, it provides a module to handle idempotency.

This post shows examples using the TypeScript version of Powertools. To get started with the Powertools idempotency module, you must install the library and configure it within your build process. For more details, follow the Powertools for AWS Lambda documentation.

Getting started

Powertools for AWS Lambda (TypeScript) is modular, meaning you can install the idempotency utility independently from the Logger, Tracing, Metrics, or other packages. Install the idempotency utility library and the AWS SDK v3 client for DynamoDB in your project using npm:

npm i @aws-lambda-powertools/idempotency @aws-sdk/client-dynamodb @aws-sdk/lib-dynamodb

Before getting started, you need to create a persistent storage layer where the idempotency utility can store its state. Your Lambda function AWS Identity and Access Management (IAM) role must have dynamodb:GetItem, dynamodb:PutItem, dynamodb:UpdateItem and dynamodb:DeleteItem permissions.

Currently, DynamoDB is the only supported persistent storage layer, so you’ll need to create a table first. Use the AWS Cloud Development Kit (CDK), AWS CloudFormation, AWS Serverless Application Model (SAM) or any Infrastructure as Code tool of your choice that supports DynamoDB resources.

The following sections illustrate how to instrument your Lambda function code to make it idempotent using a wrapper function or using middy middleware.

Using the function wrapper

Assuming you have created a DynamoDB table with the name IdempotencyTable, create a persistence layer in your Lambda function code:

import { makeIdempotent } from "@aws-lambda-powertools/idempotency";
import { DynamoDBPersistenceLayer } from "@aws-lambda-powertools/idempotency/dynamodb";

const persistenceStore = new DynamoDBPersistenceLayer({
  tableName: "IdempotencyTable",
});

Now, apply the makeIdempotent function wrapper to your Lambda function handler to make it idempotent and use the previously configured persistence store.

import { makeIdempotent } from '@aws-lambda-powertools/idempotency';
import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb';
import type { Context } from 'aws-lambda';
import type { Request, Response, SubscriptionResult } from './types';

export const handler = makeIdempotent(
  async (event: Request, _context: Context): Promise<Response> => {
    try {
      const payment = … // create payment
	  
      return {
        paymentId: payment.id,
        message: 'success',
        statusCode: 200,
      };

    } catch (error) {
      throw new Error('Error creating payment');
    }
  },
  {
    persistenceStore,
  }
);

The function processes the incoming event to create a payment and return the paymentId, message, and status back to the client. Making the Lambda function handler idempotent ensures that payments are only processed once, despite multiple Lambda invocations with the same event payload. You can also apply the makeIdempotent function wrapper to any other function outside of your handler.

Use the following type definitions for this example by adding a types.ts file to your source folder:

type Request = {
  user: string;
  productId: string;
};

type Response = {
  [key: string]: unknown;
};

type SubscriptionResult = {
  id: string;
  productId: string;
};

Using middy middleware

If you are using middy middleware, Powertools provides makeHandlerIdempotent middleware to make your Lambda function handler idempotent:

import { makeHandlerIdempotent } from '@aws-lambda-powertools/idempotency/middleware';
import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb';
import middy from '@middy/core';
import type { Context } from 'aws-lambda';
import type { Request, Response, SubscriptionResult } from './types';

const persistenceStore = new DynamoDBPersistenceLayer({
  tableName: 'IdempotencyTable',
});

export const handler = middy(
  async (event: Request, _context: Context): Promise<Response> => {
    try {
      const payment = … // create payment object
	  
      return {
        paymentId: payment.id,
        message: 'success',
        statusCode: 200,
      };
    } catch (error) {
      throw new Error('Error creating payment');
    }
  }
).use(
    makeHandlerIdempotent({
      persistenceStore,
  })
);

Configuration options

The Powertools idempotency utility comes with several configuration options to change the idempotency behavior that will fit your use case scenario. This section highlights the most common configurations. You can find all available customization options in the AWS Powertools for Lambda (TypeScript) documentation.

Persistence layer options

When you create a DynamoDBPersistenceLayer object, only the tableName attribute is required. Powertools will expect the table with a partition key id and will create other attributes with default values.

You can change these default values if needed by passing the options parameter:

import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb';

const persistenceStore = new DynamoDBPersistenceLayer({
  tableName: 'idempotencyTableName',
  keyAttr: 'idempotencyKey', // default: id
  expiryAttr: 'expiresAt', // default: expiration
  inProgressExpiryAttr: 'inProgressExpiresAt', // default: in_progress_expiration
  statusAttr: 'currentStatus', // default: status
  dataAttr: 'resultData', // default: data
  validationKeyAttr: 'validationKey', .// default validation
});

Using a subset of the event payload

When you configure idempotency for your Lambda function handler, Powertools will use the entire event payload for idempotency handling by hashing the object.

However, events from AWS services such as Amazon API Gateway or Amazon Simple Queue Service (Amazon SQS) often have generated fields, such as timestamp or requestId. This results in Powertools treating each event payload as unique.

To prevent that, create an IdempotencyConfig and configure which part of the payload should be hashed for the idempotency logic.

Create the IdempotencyConfig and set eventKeyJmespath to a key within your event payload:

import { IdempotencyConfig } from '@aws-lambda-powertools/idempotency';

// Extract the idempotency key from the request headers
const config = new IdempotencyConfig({
  eventKeyJmesPath: 'headers."X-Idempotency-Key"',
});

Use the X-Idempotency-Key header for your idempotency key. Subsequent invocations with the same header value will be idempotent.

You can then add the configuration to the makeIdempotent function wrapper from the previous example:

export const handler = makeIdempotent(
  async (event: Request, _context: Context): Promise<Response> => {
    try {
      const payment = … // create payment
      
	  return {
        paymentId: payment.id,
        message: 'success',
        statusCode: 200,
      };
    } catch (error) {
      throw new Error('Error creating payment');
    }
  },
  {
    persistenceStore,
    config
  }
);

The event payload should contain X-Idempotency-Key in the headers, so Powertools can use this field to handle idempotency:

{
  "version": "2.0",
  "routeKey": "ANY /createpayment",
  "rawPath": "/createpayment",
  "rawQueryString": "",
  "headers": {
    "Header1": "value1",
    "X-Idempotency-Key": "abcdefg"
  },
  "requestContext": {
    "accountId": "123456789012",
    "apiId": "api-id",
    "domainName": "id.execute-api.us-east-1.amazonaws.com",
    "domainPrefix": "id",
    "http": {
      "method": "POST",
      "path": "/createpayment",
      "protocol": "HTTP/1.1",
      "sourceIp": "ip",
      "userAgent": "agent"
    },
    "requestId": "id",
    "routeKey": "ANY /createpayment",
    "stage": "$default",
    "time": "10/Feb/2021:13:40:43 +0000",
    "timeEpoch": 1612964443723
  },
  "body": "{\"user\":\"xyz\",\"productId\":\"123456789\"}",
  "isBase64Encoded": false
}

There are other configuration options you can apply, such as payload validation, expiration duration, local caching, and others. See the Powertools for AWS Lambda (TypeScript) documentation for more information.

Customizing the AWS SDK configuration

The DynamoDBPersistenceLayer is built-in and allows you to store the idempotency data for all your requests. Under the hood, Powertools uses the AWS SDK for JavaScript v3. Change the SDK configuration by passing a clientConfig object.

The following sample sets the region to eu-west-1:

import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb';

const persistenceStore = new DynamoDBPersistenceLayer({
  tableName: 'IdempotencyTable',
  clientConfig: {
    region: 'eu-west-1',
  },
});

If you are using your own client, you can pass it the persistence layer:

import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';

const ddbClient = new DynamoDBClient({ region: 'eu-west-1' });

const dynamoDBPersistenceLayer = new DynamoDBPersistenceLayer({
  tableName: 'IdempotencyTable',
  awsSdkV3Client: ddbClient,
});

Conclusion

Making your Lambda functions idempotent can be a challenge and, if not done correctly, can lead to duplicate data, inconsistencies, and a bad customer experience. This post shows how to use Powertools for AWS Lambda (TypeScript) to process your critical transactions only once when using AWS Lambda.

For more details on the Powertools idempotency feature and its configuration options, see the full documentation.

For more serverless learning resources, visit Serverless Land.

Integrating IBM MQ with Amazon SQS and Amazon SNS using Apache Camel

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/integrating-ibm-mq-with-amazon-sqs-and-amazon-sns-using-apache-camel/

This post is written by Joaquin Rinaudo, Principal Security Consultant and Gezim Musliaj, DevOps Consultant.

IBM MQ is a message-oriented middleware (MOM) product used by many enterprise organizations, including global banks, airlines, and healthcare and insurance companies.

Customers often ask us for guidance on how they can integrate their existing on-premises MOM systems with new applications running in the cloud. They’re looking for a cost-effective, scalable and low-effort solution that enables them to send and receive messages from their cloud applications to these messaging systems.

This blog post shows how to set up a bi-directional bridge from on-premises IBM MQ to Amazon MQ, Amazon Simple Queue Service (Amazon SQS), and Amazon Simple Notification Service (Amazon SNS).

This allows your producer and consumer applications to integrate using fully managed AWS messaging services and Apache Camel. Learn how to deploy such a solution and how to test the running integration using SNS, SQS, and a demo IBM MQ cluster environment running on Amazon Elastic Container Service (ECS) with AWS Fargate.

This solution can also be used as part of a step-by-step migration using the approach described in the blog post Migrating from IBM MQ to Amazon MQ using a phased approach.

Solution overview

The integration consists of an Apache Camel broker cluster that bi-directionally integrates an IBM MQ system and target systems, such as Amazon MQ running ActiveMQ, SNS topics, or SQS queues.

In the following example, AWS services, in this case AWS Lambda and SQS, receive messages published to IBM MQ via an SNS topic:

Solution architecture overview for sending messages

  1. The cloud message consumers (Lambda and SQS) subscribe to the solution’s target SNS topic.
  2. The Apache Camel broker connects to IBM MQ using secrets stored in AWS Secrets Manager and reads new messages from the queue using IBM MQ’s Java library. Only IBM MQ messages are supported as a source.
  3. The Apache Camel broker publishes these new messages to the target SNS topic. It uses the Amazon SNS Extended Client Library for Java to store any messages larger than 256 KB in an Amazon Simple Storage Service (Amazon S3) bucket.
  4. Apache Camel stores any message that cannot be delivered to SNS after two retries in an S3 dead letter queue bucket.

The next diagram demonstrates how the solution sends messages back from an SQS queue to IBM MQ:

Solution architecture overview for sending messages

  1. A sample message producer using Lambda sends messages to an SQS queue. It uses the Amazon SQS Extended Client Library for Java to send messages larger than 256 KB.
  2. The Apache Camel broker receives the messages published to SQS, using the SQS Extended Client Library if needed.
  3. The Apache Camel broker sends the message to the IBM MQ target queue.
  4. As before, the broker stores messages that cannot be delivered to IBM MQ in the S3 dead letter queue bucket.

A phased live migration consists of two steps:

  1. Deploy the broker service to allow reading messages from and writing to existing IBM MQ queues.
  2. Once the consumer or producer is migrated, migrate its counterpart to the newly selected service (SNS or SQS).

Next, you will learn how to set up the solution using the AWS Cloud Development Kit (AWS CDK).

Deploying the solution

Prerequisites

  • AWS CDK
  • TypeScript
  • Java
  • Docker
  • Git
  • Yarn

Step 1: Cloning the repository

Clone the repository using git:

git clone https://github.com/aws-samples/aws-ibm-mq-adapter

Step 2: Setting up test IBM MQ credentials

This demo uses IBM MQ’s mutual TLS authentication. To do this, you must generate X.509 certificates and store them in AWS Secrets Manager by running the following commands in the app folder:

  1. Generate X.509 certificates:
    ./deploy.sh generate_secrets
  2. Set up the secrets required for the Apache Camel broker (replace <integration-name> with, for example, dev):
    ./deploy.sh create_secrets broker <integration-name>
  3. Set up secrets for the mock IBM MQ system:
    ./deploy.sh create_secrets mock
  4. Update the cdk.json file with the secrets ARN output from the previous commands:
    • IBM_MOCK_PUBLIC_CERT_ARN
    • IBM_MOCK_PRIVATE_CERT_ARN
    • IBM_MOCK_CLIENT_PUBLIC_CERT_ARN
    • IBMMQ_TRUSTSTORE_ARN
    • IBMMQ_TRUSTSTORE_PASSWORD_ARN
    • IBMMQ_KEYSTORE_ARN
    • IBMMQ_KEYSTORE_PASSWORD_ARN

If you are using your own IBM MQ system and already have X.509 certificates available, you can use the script to upload those certificates to AWS Secrets Manager after running the script.

Step 3: Configuring the broker

The solution deploys two brokers, one to read messages from the test IBM MQ system and one to send messages back. A separate Apache Camel cluster is used per integration to support better use of Auto Scaling functionality and to avoid issues across different integration operations (consuming and reading messages).

Update the cdk.json file with the following values:

  • accountId: AWS account ID to deploy the solution to.
  • region: name of the AWS Region to deploy the solution to.
  • defaultVPCId: specify a VPC ID for an existing VPC in the AWS account where the broker and mock are deployed.
  • allowedPrincipals: add your account ARN (e.g., arn:aws:iam::123456789012:root) to allow this AWS account to send messages to and receive messages from the broker. You can use this parameter to set up cross-account relationships for both SQS and SNS integrations and support multiple consumers and producers.

Step 4: Bootstrapping and deploying the solution

  1. Make sure you have the correct AWS_PROFILE and AWS_REGION environment variables set for your development account.
  2. Run yarn cdk bootstrap –-qualifier mq <aws://<account-id>/<region> to bootstrap CDK.
  3. Run yarn install to install CDK dependencies.
  4. Finally, execute yarn cdk deploy '*-dev' –-qualifier mq --require-approval never to deploy the solution to the dev environment.

Step 5: Testing the integrations

Use AWS System Manager Session Manager and port forwarding to establish tunnels to the test IBM MQ instance to access the web console and send messages manually. For more information on port forwarding, see Amazon EC2 instance port forwarding with AWS System Manager.

  1. In a command line terminal, make sure you have the correct AWS_PROFILE and AWS_REGION environment variables set for your development account.
  2. In addition, set the following environment variables:
    • IBM_ENDPOINT: endpoint for IBM MQ. Example: network load balancer for IBM mock mqmoc-mqada-1234567890.elb.eu-west-1.amazonaws.com.
    • BASTION_ID: instance ID for the bastion host. You can retrieve this output from Step 4: Bootstrapping and deploying the solution listed after the mqBastionStack deployment.

    Use the following command to set the environment variables:

    export IBM_ENDPOINT=mqmoc-mqada-1234567890.elb.eu-west-1.amazonaws.com
    export BASTION_ID=i-0a1b2c3d4e5f67890
  3. Run the script test/connect.sh.
  4. Log in to the IBM web console via https://127.0.0.1:9443/admin using the default IBM user (admin) and the password stored in AWS Secrets Manager as mqAdapterIbmMockAdminPassword.

Sending data from IBM MQ and receiving it in SNS:

  1. In the IBM MQ console, access the local queue manager QM1 and DEV.QUEUE.1.
  2. Send a message with the content Hello AWS. This message will be processed by AWS Fargate and published to SNS.
  3. Access the SQS console and choose the snsIntegrationStack-dev-2 prefix queue. This is an SQS queue subscribed to the SNS topic for testing.
  4. Select Send and receive message.
  5. Select Poll for messages to see the Hello AWS message previously sent to IBM MQ.

Sending data back from Amazon SQS to IBM MQ:

  1. Access the SQS console and choose the queue with the prefix sqsPublishIntegrationStack-dev-3-dev.
  2. Select Send and receive messages.
  3. For Message Body, add Hello from AWS.
  4. Choose Send message.
  5. In the IBM MQ console, access the local queue manager QM1 and DEV.QUEUE.2 to find your message listed under this queue.

Step 6: Cleaning up

Run cdk destroy '*-dev' to destroy the resources deployed as part of this walkthrough.

Conclusion

In this blog, you learned how you can exchange messages between IBM MQ and your cloud applications using Amazon SQS and Amazon SNS.

If you’re interested in getting started with your own integration, follow the README file in the GitHub repository. If you’re migrating existing applications using industry-standard APIs and protocols such as JMS, NMS, or AMQP 1.0, consider integrating with Amazon MQ using the steps provided in the repository.

If you’re interested in running Apache Camel in Kubernetes, you can also adapt the architecture to use Apache Camel K instead.

For more serverless learning resources, visit Serverless Land.

Retrieving parameters and secrets with Powertools for AWS Lambda (TypeScript)

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/retrieving-parameters-and-secrets-with-powertools-for-aws-lambda-typescript/

This post is written by Andrea Amorosi, Senior Solutions Architect and Pascal Vogel, Solutions Architect.

When building serverless applications using AWS Lambda, you often need to retrieve parameters, such as database connection details, API secrets, or global configuration values at runtime. You can make these parameters available to your Lambda functions via secure, scalable, and highly available parameter stores, such as AWS Systems Manager Parameter Store or AWS Secrets Manager.

The Parameters utility for Powertools for AWS Lambda (TypeScript) simplifies the integration of these parameter stores inside your Lambda functions. The utility provides high-level functions for retrieving secrets and parameters, integrates caching and transformations, and reduces the amount of boilerplate code you must write.

The Parameters utility supports the following parameter stores:

The Parameters utility is part of the Powertools for AWS Lambda (TypeScript), which you can use in both JavaScript and TypeScript code bases. Implementing guidance from the Serverless Applications Lens of the AWS Well-Architected Framework, Powertools provides utilities to ease the adoption of best practices such as distributed tracing, structured logging, and asynchronous business and application metrics.

For more details, see the Powertools for AWS Lambda (TypeScript) documentation on GitHub and the introduction blog post.

This blog post shows how to use the new Parameters utility to retrieve parameters and secrets in your JavaScript and TypeScript Lambda functions securely.

Getting started with the Parameters utility

Initial setup

The Powertools toolkit is modular, meaning that you can install the Parameters utility independently from the Logger, Tracing, or Metrics packages. Install the Parameters utility library in your project via npm:

npm install @aws-lambda-powertools/parameters

In addition, you must add the AWS SDK client for the parameter store you are planning to use. The Parameters utility supports AWS SDK v3 for JavaScript only, which allows the utility to be modular. You install only the needed SDK packages to keep your bundle size small.

Next, assign appropriate AWS Identity and Access Management (IAM) permissions to the Lambda function execution role of your Lambda function that allow retrieving parameters from the parameter store.

The following sections illustrate how to perform the previously mentioned steps for some typical parameter retrieval scenarios.

Retrieving a single parameter from SSM Parameter Store

To retrieve parameters from SSM Parameter Store, install the AWS SDK client for SSM in addition to the Parameters utility:

npm install @aws-sdk/client-ssm

To retrieve an individual parameter, the Parameters utility provides the getParameter function:

import { getParameter } from '@aws-lambda-powertools/parameters/ssm';

export const handler = async (): Promise<void> => {
  // Retrieve a single parameter
  const parameter = await getParameter('/my/parameter');
  console.log(parameter);
};

Finally, you need to assign an IAM policy with the ssm:GetParameter permission to your Lambda function execution role. Apply the principle of least privilege by scoping the permission to the specific parameter resource as shown in the following policy example:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "ssm:GetParameter"
      ],
      "Resource": [
        "arn:aws:ssm:AWS_REGION:AWS_ACCOUNT_ID:my/parameter"
      ]
    }
  ]
}

Adjusting cache TTL

By default, the retrieved parameters are cached in-memory for 5 seconds. This cached value is used for further invocations of the Lambda function until it expires. If your application requires a different behavior, the Parameters utility allows you to adjust the time-to-live (TTL) via the maxAge argument.

Building on the previous example, if you want to cache your retrieved parameter for 30 instead of 5 seconds, you can adapt your function code as follows:

import { getParameter } from '@aws-lambda-powertools/parameters/ssm';

export const handler = async (): Promise<void> => {
  // Retrieve a single parameter with a 30 seconds cache TTL
  const parameter = await getParameter('/my/parameter', { maxAge: 30 });
  console.log(parameter);
};

In other cases, you may want to always retrieve the latest value from the parameter store and ignore any cached value. To achieve this, set the forceFetch parameter to true:

import { getParameter } from '@aws-lambda-powertools/parameters/ssm';

export const handler = async (): Promise<void> => {
  // Always retrieve the latest value of a single parameter
  const parameter = await getParameter('/my/parameter', { forceFetch: true });
  console.log(parameter);
};

For details, see Always fetching the latest in the Powertools for AWS Lambda (TypeScript) documentation.

Decoding parameters stored in JSON or base64 format

If some of your parameters are stored in base64 or JSON, you can deserialize them via the Parameters utility’s transform argument.

Considering a parameter stored in SSM as JSON, it can be retrieved and deserialized as follows:

import { Transform } from '@aws-lambda-powertools/parameters';
import { getParameter } from '@aws-lambda-powertools/parameters/ssm';

export const handler = async (): Promise => {
  // Retrieve and deserialize a single JSON parameter
  const valueFromJson = await getParameter('/my/json/parameter', { transform: Transform.JSON });
  console.log(valueFromJson);
};

The Parameters utility supports the transform argument for all parameter store providers and high-level functions. For details, see Deserializing values with transform parameters.

Working with encrypted parameters in SSM Parameter Store

SSM Parameter Store supports encrypted secure string parameters via the AWS Key Management Service (AWS KMS). The Parameters utility allows you to retrieve these encrypted parameters by adding the decrypt argument to your request.

For example, you could retrieve an encrypted parameter as follows:

import { getParameter } from '@aws-lambda-powertools/parameters/ssm';

export const handler = async (): Promise<void> => {
  // Decrypt the parameter
  const decryptedParameter = await getParameter('/my/encrypted/parameter', { decrypt: true });
  console.log(decryptedParameter);
};

In this case, the Lambda function execution role needs to have the kms:Decrypt IAM permission in addition to ssm:GetParameter.

Retrieving multiple parameters from SSM Parameter Store

Besides retrieving a single parameter using getParameter, you can also use getParameters to recursively retrieve multiple parameters under a SSM Parameter Store path, or getParametersByName to retrieve multiple distinct parameters by their full name.

You can also apply custom caching, transform, or decrypt configurations per parameter when using getParametersByName. The following example retrieves three distinct parameters from SSM Parameter Store with different caching and transform configurations:

import { getParametersByName } from '@aws-lambda-powertools/parameters/ssm';
import type {
  SSMGetParametersByNameOptionsInterface
} from '@aws-lambda-powertools/parameters/ssm/types';

const props: Record<string, SSMGetParametersByNameOptionsInterface> = {
  '/develop/service/commons/telemetry/config': { maxAge: 300, transform: 'json' },
  '/no_cache_param': { maxAge: 0 },
  '/develop/service/payment/api/capture/url': {}, // When empty or undefined, it uses default values
};

export const handler = async (): Promise<void> => {
  // This returns an object with the parameter name as key
  const parameters = await getParametersByName(props);
  for (const [ key, value ] of Object.entries(parameters)) {
    console.log(`${key}: ${value}`);
  }
};

Retrieving multiple parameters requires the GetParameter and GetParameters permissions to be present in the Lambda function execution role.

Retrieving secrets from Secrets Manager

To securely store sensitive parameters such as passwords or API keys for external services, Secrets Manager is a suitable option. To retrieve secrets from Secrets Manager using the Parameters utility, install the AWS SDK client for Secrets Manager in addition to the Parameters utility:

npm install @aws-sdk/client-secrets-manager

Now you can access a secret using its key as follows:

import { getSecret } from '@aws-lambda-powertools/parameters/secrets';

export const handler = async (): Promise<void> => {
  // Retrieve a single secret
  const secret = await getSecret('my-secret');
  console.log(secret);
};

Getting a secret from Secrets Manager requires you to add the secretsmanager:GetSecretValue IAM permission to your Lambda function execution role.

Retrieving an application configuration from AppConfig

If you plan to leverage feature flags or dynamic application configurations in your applications built on Lambda, AppConfig is a suitable option. The Parameters utility makes it easy to fetch configurations from AppConfig while benefitting from utility features such as caching and transformations.

For example, considering an AppConfig application called my-app with an environment called my-env, you can retrieve its configuration profile my-configuration as follows:

import { getAppConfig } from '@aws-lambda-powertools/parameters/appconfig';

export const handler = async (): Promise<void> => {
  // Retrieve a configuration, latest version
  const config = await getAppConfig('my-configuration', {
    environment: 'my-env',
    application: 'my-app'
  });
  console.log(config);
};

Retrieving a configuration requires both the appconfig:GetLatestConfiguration and appconfig:StartConfigurationSession IAM permissions to be attached to the Lambda function execution role.

Retrieving a parameter from a DynamoDB table

DynamoDB’s low latency and high flexibility make it a great option for storing parameters. To use DynamoDB as a parameter store via the Parameters utility, install the DynamoDB AWS SDK client and utility package in addition to the Parameters utility.

npm install @aws-sdk/client-dynamodb @aws-sdk/util-dynamodb

By default, the Parameters utility expects the DynamoDB table containing the parameters to have a partition key of id and an attribute called value. For example, assuming an item with an id of my-parameter and a value of my-value stored in an DynamoDB table called my-table, you can retrieve it as follows:

import { DynamoDBProvider } from '@aws-lambda-powertools/parameters/dynamodb';

const dynamoDBProvider = new DynamoDBProvider({ tableName: 'my-table' });

export const handler = async (): Promise<void> => {
  // Retrieve a value from DynamoDB
  const value = await dynamoDBProvider.get('my-parameter');
  console.log(value);
};

In case of retrieving a single parameter from DynamoDB, the Lambda function execution role needs to have the dynamodb:GetItem IAM permission.

The Parameters utility DynamoDB provider can also retrieve multiple parameters from a table with a single request via a DynamoDB query. See DynamoDB provider in the Powertools for AWS Lambda (TypeScript) documentation for details.

Conclusion

This blog post introduces the Powertools for AWS Lambda (TypeScript) Parameters utility and demonstrates how it is used with different parameter stores. The Parameters utility allows you to retrieve secrets and parameters in your Lambda function from SSM Parameter Store, Secrets Manager, AppConfig, DynamoDB, and custom parameter stores. By using the utility, you get access to functionality such as caching and transformation, and reduce the amount of boilerplate code you need to write for your Lambda functions.

To learn more about the Parameters utility and its full set of functionality, take a look at the Powertools for AWS Lambda (TypeScript) documentation.

Share your feedback for Powertools for AWS Lambda (TypeScript) by opening a GitHub issue.

For more serverless learning resources, visit Serverless Land.

Implementing AWS Well-Architected best practices for Amazon SQS – Part 3

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/implementing-aws-well-architected-best-practices-for-amazon-sqs-part-3/

This blog is written by Chetan Makvana, Senior Solutions Architect and Hardik Vasa, Senior Solutions Architect.

This is the third part of a three-part blog post series that demonstrates best practices for Amazon Simple Queue Service (Amazon SQS) using the AWS Well-Architected Framework.

This blog post covers best practices using the Performance Efficiency Pillar, Cost Optimization Pillar, and Sustainability Pillar. The inventory management example introduced in part 1 of the series will continue to serve as an example.

See also the other two parts of the series:

Performance Efficiency Pillar

The Performance Efficiency Pillar includes the ability to use computing resources efficiently to meet system requirements, and to maintain that efficiency as demand changes and technologies evolve. It recommends best practices to use trade-offs to improve performance, such as learning about design patterns and services and identify how tradeoffs impact customers and efficiency.

By adopting these best practices, you can optimize the performance of SQS by employing appropriate configurations and techniques while considering trade-offs for the specific use case.

Best practice: Use action batching or horizontal scaling or both to increase throughput

For achieving high throughput in SQS, optimizing the performance of your message processing is crucial. You can use two techniques: horizontal scaling and action batching.

When dealing with high message volume, consider horizontally scaling the message producers and consumers by increasing the number of threads per client, by adding more clients, or both. By distributing the load across multiple threads or clients, you can handle a high number of messages concurrently.

Action batching distributes the latency of the batch action over the multiple messages in a batch request, rather than accepting the entire latency for a single message. Because each round trip carries more work, batch requests make more efficient use of threads and connections, improving throughput. You can combine batching with horizontal scaling to provide throughput with fewer threads, connections, and requests than individual message requests.

In the inventory management example that we introduced in part 1, this scaling behavior is managed by AWS for the AWS Lambda function responsible for backend processing. When a Lambda function subscribes to an SQS queue, Lambda polls the queue as it waits for the inventory updates requests to arrive. Lambda consumes messages in batches, starting at five concurrent batches with five functions at a time. If there are more messages in the queue, Lambda adds up to 60 functions per minute, up to 1,000 functions, to consume those messages.

This means that Lambda can scale up to 1,000 concurrent Lambda functions processing messages from the SQS queue. Batching enables the inventory management system to handle a high volume of inventory update messages efficiently. This ensures real-time visibility into inventory levels and enhances the accuracy and responsiveness of inventory management operations.

Best practice: Trade-off between SQS standard and First-In-First-Out (FIFO) queues

SQS supports two types of queues: standard queues and FIFO queues. Understanding the trade-offs between SQS standard and FIFO queues allows you to make an informed choice that aligns with your application’s requirements and priorities. While SQS standard queues support a nearly unlimited throughput, it sacrifices strict message ordering and occasionally delivers messages in an order different from the one they were sent in. If maintaining the exact order of events is not critical for your application, utilizing SQS standard queues can provide significant benefits in terms of throughput and scalability.

On the other hand, SQS FIFO queues guarantee message ordering and exactly-once processing. This makes them suitable for applications where maintaining the order of events is crucial, such as financial transactions or event-driven workflows. However, FIFO queues have a lower throughput compared to standard queues. They can handle up to 3,000 transactions per second (TPS) per API method with batching, and 300 TPS without batching. Consider using FIFO queues only when the order of events is important for the application, otherwise use standard queues.

In the inventory management example, since the order of inventory records is not crucial, the potential out-of-order message delivery that can occur with SQS standard queues is unlikely to impact the inventory processing. This allows you to take advantage of the benefits provided by SQS standard queues, including their ability to handle a high number of transactions per second.

Cost Optimization Pillar

The Cost Optimization Pillar includes the ability to run systems to deliver business value at the lowest price. It recommends best practices to build and operate cost-aware workloads that achieve business outcomes while minimizing costs and allowing your organization to maximize its return on investment.

Best practice: Configure cost allocation tags for SQS to organize and identify SQS for cost allocation

A well-defined tagging strategy plays a vital role in establishing accurate chargeback or showback models. By assigning appropriate tags to resources, such as SQS queues, you can precisely allocate costs to different teams or applications. This level of granularity ensures fair and transparent cost allocation, enabling better financial management and accountability.

In the inventory management example, tagging the SQS queue allows for specific cost tracking under the Inventory department, enabling a more accurate assessment of expenses. The following code snippet shows how to tag the SQS queue using AWS Could Development Kit (AWS CDK).

# Create the SQS queue with DLQ setting
queue = sqs.Queue(
    self,
    "InventoryUpdatesQueue",
    visibility_timeout=Duration.seconds(300),
)

Tags.of(queue).add("department", "inventory")

Best practice: Use long polling

SQS offers two methods for receiving messages from a queue: short polling and long polling. By default, queues use short polling, where the ReceiveMessage request queries a subset of servers to identify available messages. Even if the query found no messages, SQS sends the response right away.

In contrast, long polling queries all servers in the SQS infrastructure to check for available messages. SQS responds only after collecting at least one message, respecting the specified maximum. If no messages are immediately available, the request is held open until a message becomes available or the polling wait time expires. In such cases, an empty response is sent.

Short polling provides immediate responses, making it suitable for applications that require quick feedback or near-real-time processing. On the other hand, long polling is ideal when efficiency is prioritized over immediate feedback. It reduces API calls, minimizes network traffic, and improves resource utilization, leading to cost savings.

In the inventory management example, long polling enhances the efficiency of processing inventory updates. It collects and retrieves available inventory update messages in a batch of 10, reducing the frequency of API requests. This batching approach optimizes resource utilization, minimizes network traffic, and reduces excessive API consumption, resulting in cost savings. You can configure this behavior using batch size and batch window:

# Add the SQS queue as a trigger to the Lambda function
sqs_to_dynamodb_function.add_event_source_mapping(
    "MyQueueTrigger", event_source_arn=queue.queue_arn, batch_size=10
)

Best practice: Use batching

Batching messages together allows you to send or retrieve multiple messages in a single API call. This reduces the number of API requests required to process or retrieve messages compared to sending or retrieving messages individually. Since SQS pricing is based on the number of API requests, reducing the number of requests can lead to cost savings.

To send, receive, and delete messages, and to change the message visibility timeout for multiple messages with a single action, use Amazon SQS batch API actions. This also helps with transferring less data, effectively reducing the associated data transfer costs, especially if you have many messages.

In the context of the inventory management example, the CSV processing Lambda function groups 10 inventory records together in each API call, forming a batch. By doing so, the number of API requests is reduced by a factor of 10 compared to sending each record separately. This approach optimizes the utilization of API resources, streamlines message processing, and ultimately contributes to cost efficiency. Following is the code snippet from the CSV processing Lambda function showcasing the use of SendMessageBatch to send 10 messages with a single action.

# Parse the CSV records and send them to SQS as batch messages
csv_reader = csv.DictReader(csv_content.splitlines())
message_batch = []
for row in csv_reader:
    # Convert the row to JSON
    json_message = json.dumps(row)

    # Add the message to the batch
    message_batch.append(
        {"Id": str(len(message_batch) + 1), "MessageBody": json_message}
    )

    # Send the batch of messages when it reaches the maximum batch size (10 messages)
    if len(message_batch) == 10:
        sqs_client.send_message_batch(QueueUrl=queue_url, Entries=message_batch)
        message_batch = []
        print("Sent messages in batch")

Best practice: Use temporary queues

In case of short-lived, lightweight messaging with synchronous two-way communication, you can use temporary queues. The temporary queue makes it easy to create and delete many temporary messaging destinations without inflating your AWS bill. The key concept behind this is the virtual queue. Virtual queues let you multiplex many low-traffic queues onto a single SQS queue. Creating a virtual queue only instantiates a local buffer to hold messages for consumers as they arrive; there is no API call to SQS, and no costs associated with creating a virtual queue.

The inventory management example does not use temporary queues. However, in use cases that involve short-lived, lightweight messaging with synchronous two-way communication, adopting the best practice of using temporary queues and virtual queues can enhance the overall efficiency, reduce costs, and simplify the management of messaging destinations.

Sustainability Pillar

The Sustainability Pillar provides best practices to meet sustainability targets for your AWS workloads. It encompasses considerations related to energy efficiency and resource optimization.

Best practice: Use long polling

Besides its cost optimization benefits explained as part of the Cost Optimization Pillar, long polling also plays a crucial role in improving resource efficiency by reducing API requests, minimizing network traffic, and optimizing resource utilization.

By collecting and retrieving available messages in a batch, long polling reduces the frequency of API requests, resulting in improved resource utilization and minimized network traffic. By reducing excessive API consumption through long polling, you can effectively use resources. It collects and retrieves messages in batches, reducing excessive API consumption and unnecessary network traffic.

By reducing API calls, it optimizes data transfer and infrastructure operations. Additionally, long polling’s batching approach optimizes resource allocation, utilizing system resources more efficiently and improving energy efficiency. This enables the inventory management system to handle high message volumes effectively while operating in a cost-efficient and resource-efficient manner.

Conclusion

This blog post explores best practices for SQS using the Performance Efficiency Pillar, Cost Optimization Pillar, and Sustainability Pillar of the AWS Well-Architected Framework. We cover techniques such as batch processing, message batching, and scaling considerations. We also discuss important considerations, such as resource utilization, minimizing resource waste, and reducing cost.

This three-part blog post series covers a wide range of best practices, spanning the Operational Excellence, Security, Reliability, Performance Efficiency, Cost Optimization, and Sustainability Pillars of the AWS Well-Architected Framework. By following these guidelines and leveraging the power of the AWS Well-Architected Framework, you can build robust, secure, and efficient messaging systems using SQS.

For more serverless learning resources, visit Serverless Land.

Implementing AWS Well-Architected best practices for Amazon SQS – Part 2

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/implementing-aws-well-architected-best-practices-for-amazon-sqs-part-2/

This blog is written by Chetan Makvana, Senior Solutions Architect and Hardik Vasa, Senior Solutions Architect.

This is the second part of a three-part blog post series that demonstrates implementing best practices for Amazon Simple Queue Service (Amazon SQS) using the AWS Well-Architected Framework.

This blog post covers best practices using the Security Pillar and Reliability Pillar of the AWS Well-Architected Framework. The inventory management example introduced in part 1 of the series will continue to serve as an example.

See also the other two parts of the series:

Security Pillar

The Security Pillar includes the ability to protect data, systems, and assets and to take advantage of cloud technologies to improve your security. This pillar recommends putting in place practices that influence security. Using these best practices, you can protect data while in-transit (as it travels to and from SQS) and at rest (while stored on disk in SQS), or control who can do what with SQS.

Best practice: Configure server-side encryption

If your application has a compliance requirement such as HIPAA, GDPR, or PCI-DSS mandating encryption at rest, if you are looking to improve data security to protect against unauthorized access, or if you are just looking for simplified key management for the messages sent to the SQS queue, you can leverage Server-Side Encryption (SSE) to protect the privacy and integrity of your data stored on SQS.

SQS and AWS Key Management Service (KMS) offer two options for configuring server-side encryption. SQS-managed encryptions keys (SSE-SQS) provide automatic encryption of messages stored in SQS queues using AWS-managed keys. This feature is enabled by default when you create a queue. If you choose to use your own AWS KMS keys to encrypt and decrypt messages stored in SQS, you can use the SSE-KMS feature.

Amazon SQS Encryption Settings

SSE-KMS provides greater control and flexibility over encryption keys, while SSE-SQS simplifies the process by managing the encryption keys for you. Both options help you protect sensitive data and comply with regulatory requirements by encrypting data at rest in SQS queues. Note that SSE-SQS only encrypts the message body and not the message attributes.

In the inventory management example introduced in part 1, an AWS Lambda function responsible for CSV processing sends incoming messages to an SQS queue when an inventory updates file is dropped into the Amazon Simple Storage Service (Amazon S3) bucket. SQS encrypts these messages in the queue using SQS-SSE. When a backend processing Lambda polls messages from the queue, the encrypted message is decrypted, and the function inserts inventory updates into Amazon DynamoDB.

The AWS Could Development Kit (AWS CDK) code sets SSE-SQS as the default encryption key type. However, the following AWS CDK code shows how to encrypt the queue with SSE-KMS.

# Create the SQS queue with DLQ setting
queue = sqs.Queue(
    self,
    "InventoryUpdatesQueue",
    visibility_timeout=Duration.seconds(300),
    encryption=sqs.QueueEncryption.KMS_MANAGED,
)

Best practice: Implement least-privilege access using access policy

For securing your resources in AWS, implementing least-privilege access is critical. This means granting users and services the minimum level of access required to perform their tasks. Least-privilege access provides better security, allows you to meet your compliance requirements, and offers accountability via a clear audit trail of who accessed what resources and when.

By implementing least-privilege access using access policies, you can help reduce the risk of security breaches and ensure that your resources are only accessed by authorized users and services. AWS Identity and Access Management (IAM) policies apply to users, groups, and roles, while resource-based policies apply to AWS resources such as SQS queues. To implement least-privilege access, it’s essential to start by defining what actions are required for each user or service to perform their tasks.

In the inventory management example, the CSV processing Lambda function doesn’t perform any other task beyond parsing the inventory updates file and sending the inventory records to the SQS queue for further processing. To ensure that the function has the permissions to send messages to the SQS queue, grant the SQS queue access to the IAM role that the Lambda function assumes. By granting the SQS queue access to the Lambda function’s IAM role, you establish a secure and controlled communication channel. The Lambda function can only interact with the SQS queue and doesn’t have unnecessary access or permissions that might compromise the system’s security.

# Create pre-processing Lambda function
csv_processing_to_sqs_function = _lambda.Function(
    self,
    "CSVProcessingToSQSFunction",
    runtime=_lambda.Runtime.PYTHON_3_8,
    code=_lambda.Code.from_asset("sqs_blog/lambda"),
    handler="CSVProcessingToSQSFunction.lambda_handler",
    role=role,
    tracing=Tracing.ACTIVE,
)

# Define the queue policy to allow messages from the Lambda function's role only
policy = iam.PolicyStatement(
    actions=["sqs:SendMessage"],
    effect=iam.Effect.ALLOW,
    principals=[iam.ArnPrincipal(role.role_arn)],
    resources=[queue.queue_arn],
)

queue.add_to_resource_policy(policy)

Best practice: Allow only encrypted connections over HTTPS using aws:SecureTransport

It is essential to have a secure and reliable method for transferring data between AWS services and on-premises environments or other external systems. With HTTPS, a network-based attacker cannot eavesdrop on network traffic or manipulate it, using an attack such as man-in-the-middle.

With SQS, you can choose to allow only encrypted connections over HTTPS using the aws:SecureTransport condition key in the queue policy. With this condition in place, any requests made over non-secure HTTP receive a 400 InvalidSecurity error from SQS.

In the inventory management example, the CSV processing Lambda function sends inventory updates to the SQS queue. To ensure secure data transfer, the Lambda function uses the HTTPS endpoint provided by SQS. This guarantees that the communication between the Lambda function and the SQS queue remains encrypted and resistant to potential security threats.

# Create an IAM policy statement allowing only HTTPS access to the queue
secure_transport_policy = iam.PolicyStatement(
    effect=iam.Effect.DENY,
    actions=["sqs:*"],
    resources=[queue.queue_arn],
    conditions={
        "Bool": {
            "aws:SecureTransport": "false",
        },
    },
)

Best practice: Use attribute-based access controls (ABAC)

Some use-cases require granular access control. For example, authorizing a user based on user roles, environment, department, or location. Additionally, dynamic authorization is required based on changing user attributes. In this case, you need an access control mechanism based on user attributes.

Attribute-based access controls (ABAC) is an authorization strategy that defines permissions based on tags attached to users and AWS resources. With ABAC, you can use tags to configure IAM access permissions and policies for your queues. ABAC hence enables you to scale your permission management easily. You can author a single permission policy in IAM using tags created for each business role, and no longer need to update the policy when adding new resources.

ABAC for SQS queues enables two key use cases:

  • Tag-based access control: use tags to control access to your SQS queues, including control plane and data plane API calls.
  • Tag-on-create: enforce tags at the time of creation of an SQS queues and deny the creation of SQS resources without tags.

Reliability Pillar

The Reliability Pillar encompasses the ability of a workload to perform its intended function correctly and consistently when it’s expected to. By leveraging the best practices outlined in this pillar, you can enhance the way you manage messages in SQS.

Best practice: Configure dead-letter queues

In a distributed system, when messages flow between sub-systems, there is a possibility that some messages may not be processed right away. This could be because of the message being corrupted or downstream processing being temporarily unavailable. In such situations, it is not ideal for the bad message to block other messages in the queue.

Dead Letter Queues (DLQs) in SQS can improve the reliability of your application by providing an additional layer of fault tolerance, simplifying debugging, providing a retry mechanism, and separating problematic messages from the main queue. By incorporating DLQs into your application architecture, you can build a more robust and reliable system that can handle errors and maintain high levels of performance and availability.

In the inventory management example, a DLQ plays a vital role in adding message resiliency and preventing situations where a single bad message blocks the processing of other messages. If the backend Lambda function fails after multiple attempts, the inventory update message is redirected to the DLQ. By inspecting these unconsumed messages, you can troubleshoot and redrive them to the primary queue or to custom destination using the DLQ redrive feature. You can also automate redrive by using a set of APIs programmatically. This ensures accurate inventory updates and prevents data loss.

The following AWS CDK code snippet shows how to create a DLQ for the source queue and sets up a DLQ policy to only allow messages from the source SQS queue. It is recommended not to set the max_receive_count value to 1, especially when using a Lambda function as the consumer, to avoid accumulating many messages in the DLQ.

# Create the Dead Letter Queue (DLQ)
dlq = sqs.Queue(self, "InventoryUpdatesDlq", visibility_timeout=Duration.seconds(300))

# Create the SQS queue with DLQ setting
queue = sqs.Queue(
    self,
    "InventoryUpdatesQueue",
    visibility_timeout=Duration.seconds(300),
    dead_letter_queue=sqs.DeadLetterQueue(
        max_receive_count=3,  # Number of retries before sending the message to the DLQ
        queue=dlq,
    ),
)
# Create an SQS queue policy to allow source queue to send messages to the DLQ
policy = iam.PolicyStatement(
    effect=iam.Effect.ALLOW,
    actions=["sqs:SendMessage"],
    resources=[dlq.queue_arn],
    conditions={"ArnEquals": {"aws:SourceArn": queue.queue_arn}},
)
queue.queue_policy = iam.PolicyDocument(statements=[policy])

Best practice: Process messages in a timely manner by configuring the right visibility timeout

Setting the appropriate visibility timeout is crucial for efficient message processing in SQS. The visibility timeout is the period during which SQS prevents other consumers from receiving and processing a message after it has been polled from the queue.

To determine the ideal visibility timeout for your application, consider your specific use case. If your application typically processes messages within a few seconds, set the visibility timeout to a few minutes. This ensures that multiple consumers don’t process the message simultaneously. If your application requires more time to process messages, consider breaking them down into smaller units or batching them to improve performance.

If a message fails to process and is returned to the queue, it will not be available for processing again until the visibility timeout period has elapsed. Increasing the visibility timeout will increase the overall latency of your application. Therefore, it’s important to balance the tradeoff between reducing the likelihood of message duplication and maintaining a responsive application.

In the inventory management example, setting the right visibility timeout helps the application fail fast and improve the message processing times. Since the Lambda function typically processes messages within milliseconds, a visibility timeout of 30 seconds is set in the following AWS CDK code snippet.

queue = sqs.Queue(
    self,
    " InventoryUpdatesQueue",
    visibility_timeout=Duration.seconds(30),
)

It is recommended to keep the SQS queue visibility timeout to at least six times the Lambda function timeout, plus the value of MaximumBatchingWindowInSeconds. This allows Lambda function to retry the messages if the invocation fails.

Conclusion

This blog post explores best practices for SQS using the Security Pillar and Reliability Pillar of the AWS Well-Architected Framework. We discuss various best practices and considerations to ensure the security of SQS. By following these best practices, you can create a robust and secure messaging system using SQS. We also highlight fault tolerance and processing a message in a timely manner as important aspects of building reliable applications using SQS.

The next part of this blog post series focuses on the Performance Efficiency Pillar, Cost Optimization Pillar, and Sustainability Pillar of the AWS Well-Architected Framework and explore best practices for SQS.

For more serverless learning resources, visit Serverless Land.

Implementing AWS Well-Architected best practices for Amazon SQS – Part 1

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/implementing-aws-well-architected-best-practices-for-amazon-sqs-part-1/

This blog is written by Chetan Makvana, Senior Solutions Architect and Hardik Vasa, Senior Solutions Architect.

Amazon Simple Queue Service (Amazon SQS) is a fully managed message queuing service that makes it easy to decouple and scale microservices, distributed systems, and serverless applications. AWS customers have constantly discovered powerful new ways to build more scalable, elastic, and reliable applications using SQS. You can leverage SQS in a variety of use-cases requiring loose coupling and high performance at any level of throughput, while reducing cost by only paying for value and remaining confident that no message is lost. When building applications with Amazon SQS, it is important to follow architectural best practices.

To help you identify and implement these best practices, AWS provides the AWS Well-Architected Framework for designing and operating reliable, secure, efficient, cost-effective, and sustainable systems in the AWS Cloud. Built around six pillars—operational excellence, security, reliability, performance efficiency, cost optimization, and sustainability, AWS Well-Architected provides a consistent approach for customers and partners to evaluate architectures and implement scalable designs.

This three-part blog series covers each pillar of the AWS Well-Architected Framework to implement best practices for SQS. This blog post, part 1 of the series, discusses best practices using the Operational Excellence Pillar of the AWS Well-Architected Framework.

See also the other two parts of the series:

Solution overview

Solution architecture for Inventory Updates Process

This solution architecture shows an example of an inventory management system. The system leverages Amazon Simple Storage Service (Amazon S3), AWS Lambda, Amazon SQS, and Amazon DynamoDB to streamline inventory operations and ensure accurate inventory levels. The system handles frequent updates from multiple sources, such as suppliers, warehouses, and retail stores, which are received as CSV files.

These CSV files are then uploaded to an S3 bucket, consolidating and securing the inventory data for the inventory management system’s access. The system uses a Lambda function to read and parse the CSV file, extracting individual inventory update records. The backend Lambda function transforms each inventory update record into a message and sends it to an SQS queue. Another Lambda function continually polls the SQS queue for new messages. Upon receiving a message, it retrieves the inventory update details and updates the inventory levels in DynamoDB accordingly.

This ensures that the inventory quantities for each product are accurate and reflect the latest changes. This way, the inventory management system provides real-time visibility into inventory levels across different locations and suppliers, enabling the company to monitor product availability with precision. Find the example code for this solution in the GitHub repository.

This example is used throughout this blog series to highlight how SQS best practices can be implemented based on the AWS Well Architected Framework.

Operational Excellence Pillar

The Operational Excellence Pillar includes the ability to support development and run workloads effectively, gain insight into their operation, and continuously improve supporting processes and procedures to deliver business value. To achieve operational excellence, the pillar recommends best practices such as defining workload metrics and implementing transaction traceability. This enables organizations to gain valuable insights into their operations, identify potential issues, and optimize services accordingly to improve customer experience. Furthermore, understanding the health of an application is critical to ensuring that it is functioning as expected.

Best practice: Use infrastructure as code to deploy SQS

Infrastructure as Code (IaC) helps you model, provision, and manage your cloud resources. One of the primary advantages of IaC is that it simplifies infrastructure management. With IaC, you can quickly and easily replicate your environment to multiple AWS Regions with a single turnkey solution. This makes it easy to manage your infrastructure, regardless of where your resources are located. Additionally, IaC enables you to create, deploy, and maintain infrastructure in a programmatic, descriptive, and declarative way repeatably. This reduces errors caused by manual processes, such as creating resources in the AWS Management Console. With IaC, you can easily control and track changes in your infrastructure, which makes it easier to maintain and troubleshoot your systems.

For managing SQS resources, you can use different IaC tools like AWS Serverless Application Model (AWS SAM), AWS CloudFormation, or AWS Could Development Kit (AWS CDK). There are also third-party solutions for creating SQS resources, such as the Serverless Framework. AWS CDK is a popular choice because it allows you to provision AWS resources using familiar programming languages such as Python, Java, TypeScript, Go, JavaScript, and C#/.Net.

This blog series showcases the use of AWS CDK with Python to demonstrate best practices for working with SQS. For example, the following AWS CDK code creates a new SQS queue:

from aws_cdk import (
    Duration,
    Stack,
    aws_sqs as sqs,
)
from constructs import Construct


class SqsCdBlogStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # The code that defines your stack goes here

        # example resource
        queue = sqs.Queue(
            self,
            "InventoryUpdatesQueue",
            visibility_timeout=Duration.seconds(300),
        )

Best practice: Configure CloudWatch alarms for ApproximateAgeofOldestMessage

It is important to understand Amazon CloudWatch metrics and dimensions for SQS, to have a plan in place to assess its behavior, and to add custom metrics where necessary. Once you have a good understanding of the metrics, it is essential to identify the key metrics that are most relevant to your use case and set up appropriate alerts to monitor them.

One of the key metrics that SQS provides is the ApproximateAgeOfOldestMessage metric. By monitoring this metric, you can determine the age of the oldest message in the queue, and take appropriate action to ensure that messages are processed in a timely manner. To set up alerts for the ApproximateAgeOfOldestMessage metric, you can use CloudWatch alarms. You configure these alarms to issue alerts when messages remain in the queue for extended periods of time. You can use these alerts to act, for instance by scaling up consumers to process messages more quickly or investigating potential issues with message processing.

In the inventory management example, leveraging the ApproximateAgeOfOldestMessage metric provides valuable insights into the health and performance of the SQS queue. By monitoring this metric, you can detect processing delays, optimize performance, and ensure that inventory updates are processed within the desired timeframe. This ensures that your inventory levels remain accurate and up-to-date. The following code creates an alarm which is triggered if the oldest inventory updates request is in the queue for more than 30 seconds.

# Create a CloudWatch alarm for ApproximateAgeOfOldestMessage metric
alarm = cloudwatch.Alarm(
	self,
	"OldInventoryUpdatesAlarm",
	alarm_name="OldInventoryUpdatesAlarm",
	metric=queue.metric_approximate_age_of_oldest_message(),
	threshold=600,  # Specify your desired threshold value in seconds
	evaluation_periods=1,
	comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
)

Best practice: Add a tracing header while sending a message to the queue to provide distributed tracing capabilities for faster troubleshooting

By implementing distributed tracing, you can gain a clear understanding of the flow of messages in SQS queues, identify any bottlenecks or potential issues, and proactively react to any signals that indicate an unhealthy state. Tracing provides a wider continuous view of an application and helps to follow a user journey or transaction through the application.

AWS X-Ray is an example of a distributed tracing solution that integrates with Amazon SQS to trace messages that are passed through an SQS queue. When using the X-Ray SDK, SQS can propagate tracing headers to maintain trace continuity and enable tracking, analysis, and debugging throughout downstream services. SQS supports tracing headers through the Default HTTP header and the AWSTraceHeader System Attribute. AWSTraceHeader is available for use even when auto-instrumentation through the X-Ray SDK is not, for example, when building a tracing SDK for a new language. If you are using a Lambda downstream consumer, trace context propagation is automatic.

In the inventory management example, by utilizing distributed tracing with X-Ray for SQS, you can gain deep insights into the performance, behavior, and dependencies of the inventory management system. This visibility enables you to optimize performance, troubleshoot issues more effectively, and ensure the smooth and efficient operation of the system. The following code sets up a CSV processing Lambda function and a backend processing Lambda function with active tracing enabled. The Lambda function automatically receives the X-Ray TraceId from SQS.

# Create pre-processing Lambda function
csv_processing_to_sqs_function = _lambda.Function(
    self,
    "CSVProcessingToSQSFunction",
    runtime=_lambda.Runtime.PYTHON_3_8,
    code=_lambda.Code.from_asset("sqs_blog/lambda"),
    handler="CSVProcessingToSQSFunction.lambda_handler",
    role=role,
    tracing=Tracing.ACTIVE,  # Enable active tracing with X-Ray
)

# Create a post-processing Lambda function with the specified role
sqs_to_dynamodb_function = _lambda.Function(
    self,
    "SQSToDynamoDBFunction",
    runtime=_lambda.Runtime.PYTHON_3_8,
    code=_lambda.Code.from_asset("sqs_blog/lambda"),
    handler="SQSToDynamoDBFunction.lambda_handler",
    role=role,
    tracing=Tracing.ACTIVE,  # Enable active tracing with X-Ray
)

Conclusion

This blog post explores best practices for SQS with a focus on the Operational Excellence Pillar of the AWS Well-Architected Framework. We explore key considerations for ensuring the smooth operation and optimal performance of applications using SQS. Additionally, we explore the advantages of infrastructure as code in simplifying infrastructure management and showcase how AWS CDK can be used to provision and manage SQS resources.

The next part of this blog post series addresses the Security Pillar and Reliability Pillar of the AWS Well-Architected Framework and explores best practices for SQS.

For more serverless learning resources, visit Serverless Land.

Monitor Amazon SNS-based applications end-to-end with AWS X-Ray active tracing

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/monitor-amazon-sns-based-applications-end-to-end-with-aws-x-ray-active-tracing/

This post is written by Daniel Lorch, Senior Consultant and David Mbonu, Senior Solutions Architect.

Amazon Simple Notification Service (Amazon SNS), a messaging service that provides high-throughput, push-based, many-to-many messaging between distributed systems, microservices, and event-driven serverless applications, now supports active tracing with AWS X-Ray.

With AWS X-Ray active tracing enabled for SNS, you can identify bottlenecks and monitor the health of event-driven applications by looking at segment details for SNS topics, such as resource metadata, faults, errors, and message delivery latency for each subscriber.

This blog post reviews common use cases where AWS X-Ray active tracing enabled for SNS provides a consistent view of tracing data across AWS services in real-world scenarios. We cover two architectural patterns which allow you to gain accurate visibility of your end-to-end tracing: SNS to Amazon Simple Queue Service (Amazon SQS) queues and SNS topics to Amazon Kinesis Data Firehose streams.

Getting started with the sample serverless application

To demonstrate AWS X-Ray active tracing for SNS, we will use the Wild Rydes serverless application as shown in the following figure. The application uses a microservices architecture which implements asynchronous messaging for integrating independent systems.

Wild Rydes serverless application architecture

This is how the sample serverless application works:

  1. An Amazon API Gateway receives ride requests from users.
  2. An AWS Lambda function processes ride requests.
  3. An Amazon DynamoDB table serves as a store for rides.
  4. An SNS topic serves as a fan-out for ride requests.
  5. Individual SQS queues and Lambda functions are set up for processing requests via various back-office services (customer notification, customer accounting, and others).
  6. An SNS message filter is in place for the subscription of the extraordinary rides service.
  7. A Kinesis Data Firehose delivery stream archives ride requests in an Amazon Simple Storage Service (Amazon S3) bucket.

Deploying the sample serverless application

Prerequisites

Deployment steps using AWS SAM

The sample application is provided as an AWS SAM infrastructure as code template.

This demonstrative application will deploy an API without authorization. Please consider controlling and managing access to your APIs.

  1. Clone the GitHub repository:
    git clone https://github.com/aws-samples/sns-xray-active-tracing-blog-source-code
    cd sns-xray-active-tracing-blog-source-code
  2. Build the lab artifacts from source:
    sam build
  3. Deploy the sample solution into your AWS account:
    export AWS_REGION=$(aws --profile default configure get region)
    sam deploy \
    --stack-name wild-rydes-async-msg-2 \
    --capabilities CAPABILITY_IAM \
    --region $AWS_REGION \
    --guided

    Confirm SubmitRideCompletionFunction may not have authorization defined, Is this okay? [y/N]: with yes.

  4. Wait until the stack reaches status CREATE_COMPLETE.

See the sample application README.md for detailed deployment instructions.

Testing the application

Once the application is successfully deployed, generate messages and validate that the SNS topic is publishing all messages:

  1. Look up the API Gateway endpoint:
    export AWS_REGION=$(aws --profile default configure get region)
    aws cloudformation describe-stacks \
    --stack-name wild-rydes-async-msg-2 \
    --query 'Stacks[].Outputs[?OutputKey==`UnicornManagementServiceApiSubmitRideCompletionEndpoint`].OutputValue' \
    --output text
  2. Store this API Gateway endpoint in an environment variable:
    export ENDPOINT=$(aws cloudformation describe-stacks \
    --stack-name wild-rydes-async-msg-2 \
    --query 'Stacks[].Outputs[?OutputKey==`UnicornManagementServiceApiSubmitRideCompletionEndpoint`].OutputValue' \
    --output text)
  3. Send requests to the submit ride completion endpoint by executing the following command five or more times with varying payloads:
    curl -XPOST -i -H "Content-Type\:application/json" -d '{ "from": "Berlin", "to": "Frankfurt", "duration": 420, "distance": 600, "customer": "cmr", "fare": 256.50 }' $ENDPOINT
  4. Validate that messages are being passed in the application using the CloudWatch service map:
    Messages being passed on the CloudWatch service map

See the sample application README.md for detailed testing instructions.

The sample application shows various use-cases, which are described in the following sections.

Amazon SNS to Amazon SQS fanout scenario

A common application integration scenario for SNS is the Fanout scenario. In the Fanout scenario, a message published to an SNS topic is replicated and pushed to multiple endpoints, such as SQS queues. This allows for parallel asynchronous processing and is a common application integration pattern used in event-driven application architectures.

When an SNS topic fans out to SQS queues, the pattern is called topic-queue-chaining. This means that you add a queue, in our case an SQS queue, between the SNS topic and each of the subscriber services. As messages are buffered in a persistent manner in an SQS queue, no message is lost should a subscriber process run into issues for multiple hours or days, or experience exceptions or crashes.

By placing an SQS queue in front of each subscriber service, you can leverage the fact that a queue can act as a buffering load balancer. As every queue message is delivered to one of potentially many consumer processes, subscriber services can be easily scaled out and in, and the message load is distributed over the available consumer processes. In an event where suddenly a large number of messages arrives, the number of consumer processes has to be scaled out to cope with the additional load. This takes time and you need to wait until additional processes become operational. Since messages are buffered in the queue, you do not lose any messages in the process.

To summarize, in the Fanout scenario or the topic-queue-chaining pattern:

  • SNS replicates and pushes the message to multiple endpoints.
  • SQS decouples sending and receiving endpoints.

The fanout scenario is a common application integration scenario for SNS

With AWS X-Ray active tracing enabled on the SNS topic, the CloudWatch service map shows us the complete application architecture, as follows.

Fanout scenario with an SNS topic that fans out to SQS queues in the CloudWatch service map

Prior to the introduction of AWS X-Ray active tracing on the SNS topic, the AWS X-Ray service would not be able to reconstruct the full service map and the SQS nodes would be missing from the diagram.

To see the integration without AWS X-Ray active tracing enabled, open template.yaml and navigate to the resource RideCompletionTopic. Comment out the property TracingConfig: Active, redeploy and test the solution. The service map should then show an incomplete diagram where the SNS topic is linked directly to the consumer Lambda functions, omitting the SQS nodes.

For this use case, given the Fanout scenario, enabling AWS X-Ray active tracing on the SNS topic provides full end-to-end observability of the traces available in the application.

Amazon SNS to Amazon Kinesis Data Firehose delivery streams for message archiving and analytics

SNS is commonly used with Kinesis Data Firehose delivery streams for message archival and analytics use-cases. You can use SNS topics with Kinesis Data Firehose subscriptions to capture, transform, buffer, compress and upload data to Amazon S3, Amazon Redshift, Amazon OpenSearch Service, HTTP endpoints, and third-party service providers.

We will implement this pattern as follows:

  • An SNS topic to replicate and push the message to its subscribers.
  • A Kinesis Data Firehose delivery stream to capture and buffer messages.
  • An S3 bucket to receive uploaded messages for archival.

Message archiving and analytics using Kinesis Data Firehose delivery streams consumer to the SNS topic

In order to demonstrate this pattern, an additional consumer has been added to the SNS topic. The same Fanout pattern applies and the Kinesis Data Firehose delivery stream receives messages from the SNS topic alongside the existing consumers.

The Kinesis Data Firehose delivery stream buffers messages and is configured to deliver them to an S3 bucket for archival purposes. Optionally, an SNS message filter could be added to this subscription to select relevant messages for archival.

With AWS X-Ray active tracing enabled on the SNS topic, the Kinesis Data Firehose node will appear on the CloudWatch service map as a separate entity, as can be seen in the following figure. It is worth noting that the S3 bucket does not appear on the CloudWatch service map as Kinesis does not yet support AWS X-Ray active tracing at the time of writing of this blog post.

Kinesis Data Firehose delivery streams consumer to the SNS topic in the CloudWatch Service Map

Prior to the introduction of AWS X-Ray active tracing on the SNS topic, the AWS X-Ray service would not be able to reconstruct the full service map and the Kinesis Data Firehose node would be missing from the diagram. To see the integration without AWS X-Ray active tracing enabled, open template.yaml and navigate to the resource RideCompletionTopic. Comment out the property TracingConfig: Active, redeploy and test the solution. The service map should then show an incomplete diagram where the Kinesis Data Firehose node is missing.

For this use case, given the data archival scenario with Kinesis Delivery Firehose, enabling AWS X-Ray active tracing on the SNS topic provides additional visibility on the Kinesis Data Firehose node in the CloudWatch service map.

Review faults, errors, and message delivery latency on the AWS X-Ray trace details page

The AWS X-Ray trace details page provides a timeline with resource metadata, faults, errors, and message delivery latency for each segment.

With AWS X-Ray active tracing enabled on SNS, additional segments for the SNS topic itself, but also the downstream consumers (AWS::SNS::Topic, AWS::SQS::Queue and AWS::KinesisFirehose) segments are available, providing additional faults, errors, and message delivery latency for these segments. This allows you to analyze latencies in your messages and their backend services. For example, how long a message spends in a topic, and how long it took to deliver the message to each of the topic’s subscriptions.

Additional faults, errors, and message delivery latency information on AWS X-Ray trace details page

Enabling AWS X-Ray active tracing for SNS

AWS X-Ray active tracing is not enabled by default on SNS topics and needs to be explicitly enabled.

The example application used in this blog post demonstrates how to enable active tracing using AWS SAM.

You can enable AWS X-Ray active tracing using the SNS SetTopicAttributes API, SNS Management Console, or via AWS CloudFormation. See Active tracing in Amazon SNS in the Amazon SNS Developer Guide for more options.

Cleanup

To clean up the resources provisioned as part of the sample serverless application, follow the instructions as outlined in the sample application README.md.

Conclusion

AWS X-Ray active tracing for SNS enables end-to-end visibility in real-world scenarios involving patterns like SNS to SQS and SNS to Amazon Kinesis.

But it is not only useful for these patterns. With AWS X-Ray active tracing enabled for SNS, you can identify bottlenecks and monitor the health of event-driven applications by looking at segment details for SNS topics and consumers, such as resource metadata, faults, errors, and message delivery latency for each subscriber.

Enable AWS X-Ray active tracing for SNS to gain accurate visibility of your end-to-end tracing.

For more serverless learning resources, visit Serverless Land.

Developing portable AWS Lambda functions

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/developing-portable-aws-lambda-functions/

This blog post is written by Uri Segev, Principal Serverless Specialist Solutions Architect

When developing new applications or modernizing existing ones, you might face a dilemma: which compute technology to use? A serverless compute service such as AWS Lambda or maybe containers? Often, serverless can be the better approach thanks to automatic scaling, built-in high availability, and a pay-for-use billing model. However, you may hesitate to choose serverless for reasons such as:

  • Perceived higher cost or difficulty in estimating cost
  • It is a paradigm shift, which requires learning to bridge the knowledge gap
  • Misconceptions about Lambda capabilities and use cases
  • Concern that using Lambda will result in lock-in
  • Existing investments in non-serverless platforms and tooling

This blog post suggests best practices for developing portable Lambda functions that allow you to easily port your code to containers if you later choose to. By doing so, you can avoid lock-in and try out the serverless approach in a risk-free way.

Each section of this blog post describes what you need to consider when writing portable code and the steps needed to migrate this code from Lambda to containers, if you later choose to do so.

Best practices for portable Lambda functions

Separate business logic and Lambda handler

Lambda functions are event-driven in nature. When a specific event happens, it invokes the Lambda function by calling its handler method. The handler method receives an event object which contains information regarding the reason for the function invocation. Once the function execution completes, it returns from the handler method. Whatever is returned from the handler is the function’s return value.

To write portable code, we recommend using the handler method only as an interface between the Lambda runtime (event object) and the business logic. Using Hexagonal architecture terminology, the handler should be a driving adapter making calls into the port, which is the interface exposed by the business logic The handler should extract all required information from the event object and then call a separate method that implements the business logic.

When that method returns, the handler constructs the result in the format expected by the function invoker and returns it. We also recommend splitting the handler code and the business logic code into separate files. Should you choose to migrate to containers later, you simply migrate your business logic code files with no additional changes.

The following pseudocode shows a Lambda handler that extracts information from the event object and calls the business logic. Once the business logic is done, the handler places the response in the function’s return value:

import business_logic

# The Lambda handler extracts needed information from the event
# object and invokes the business logic
handler(event, context) {
  # Extract needed information from event object payload = event[‘payload’]

  # Invoke business logic
  result = do_some_logic(payload)
  
  # Construct result for API Gateway
  return {
    statusCode: 200,
	body: result
  }
}

The following pseudocode shows the business logic. It’s located in a separate file and is unaware that it is being invoked from a Lambda function. It is pure logic.

# This is the business logic. It knows nothing about who invokes it.
do_some_logic(data) {
result = "This is my result."
  return result
}

This approach also makes it easier to run unit tests on the business logic without the need to construct event objects and to invoke the Lambda handler.

If you migrate to containers later, you include the business logic files in your container with new interface code as described in the following section.

Event source integration

One benefit of Lambda functions is the event source integration. For instance, if you integrate Lambda with Amazon Simple Queue Service (Amazon SQS), the Lambda service will take care of polling the queue, invoking the Lambda function and deleting the messages from the queue when done. By using this integration, you need to write less boilerplate code. You can focus only on implementing business logic and not the integration with the event source.

The following pseudocode shows how the Lambda handler looks like for an SQS event source:

import business_logic

handler(event, context) {
  entries = []
  # Iterate over all the messages in the event object
  for message in event[‘Records’] {
    # Call the business logic to process a single message
    success = handle_message(message)

    # Start building the response
    if Not success {
      entries.append({
      'itemIdentifier': message['messageId']
      })
    }
  }

  # Notify Lambda about failed items.
  if (let(entries) > 0) {
    return {
      'batchItemFailures': entries
    }
  }
}

As you can see in the previous code, the Lambda function has almost no knowledge that it is being invoked from SQS. There are no SQS API calls. It only knows the structure of the event object, which is specific to SQS.

When moving to a container, the integration responsibility moves from the Lambda service to you, the developer. There are different event sources in AWS, and each of them will require a different approach for consuming events and invoking business logic. For example, if the event source is Amazon API Gateway, your application will need to create an HTTP server that listens on an HTTP port and waits for incoming requests in order to invoke the business logic.

If the event source is Amazon Kinesis Data Streams, your application will need to run a poller that reads records from the shards, keep track of processed records, handle the case of a change in the number of shards in the stream, retry on errors, and more. Regardless of the event source, if you follow the previous recommendations, you will not need to change anything in the business logic code.

The following pseudocode shows how the integration with SQS will look like in a container. Note that you will lose some features such as batching, filtering, and, of course, automatic scaling.

import aws_sdk
import business_logic

QUEUE_URL = os.environ['QUEUE_URL']
BATCH_SIZE = os.environ.get('BATCH_SIZE', 1)
sqs_client = aws_sdk.client('sqs')

main() {
  # Infinite loop to poll for messages from SQS
  while True {

    # Receive a batch of messages from the queue
    response = sqs_client.receive_message(
      QueueUrl = QUEUE_URL,
      MaxNumberOfMessages = BATCH_SIZE,
      WaitTimeSeconds = 20 )

    # Loop over the messages in the batch
    entries = []
    i = 1
    for message in response.get('Messages',[]) {
      # Process a single message
      success = handle_message(message)

      # Append the message handle to an array that is later
      # used to delete processed messages
      if success {
        entries.append(
          {
            'Id': f'index{i}',
            'ReceiptHandle': message['receiptHandle']
          }
        )
        i += 1
      }
    }

    # Delete all the processed messages
    if (len(entries) > 0) {
      sqs_client.delete_message_batch(
        QueueUrl = QUEUE_URL,
        Entries = entries
      )
    }
  }
}

Another point to consider here is Lambda destinations. If your function is invoked asynchronously and you configured a destination for your function, you will need to include that in the interface code. It will need to catch any business logic error and, based on that, invoke the right destination.

Package functions as containers

Lambda supports packaging functions as .zip files and container images. To develop portable code, we recommend using container images as your default packaging method. Even though you package the function as a container image, you can’t run it on other container platforms such as Amazon Elastic Container Service (Amazon ECS) or Amazon Elastic Kubernetes Service (EKS). However, by packaging it this way, the migration to containers later will be easier as you are already using the same tools and you already created a Dockerfile that will require minimal changes.

An example Dockerfile for Lambda looks like this:

FROM public.ecr.aws/lambda/python:3.9
COPY *.py requirements.txt ./
RUN python3.9 -m pip install -r requirements.txt -t .
CMD ["app.lambda_handler"]

If you move to containers later, you will need to change the Dockerfile to use a different base image and adapt the CMD line that defines how to start the application. This is in addition to the code changes described in the previous section.

The corresponding Dockerfile for the container will look like this:

FROM python:3.9
COPY *.py requirements.txt ./
RUN python3.9 -m pip install -r requirements.txt -t .
CMD ["python", "./app.py"]

The deployment pipeline also needs to change as we deploy to a different target. However, building the artifacts remains the same.

Single invocation per instance

Lambda functions run in their own isolated runtime environment. Each environment handles a single request at a time which works great for Lambda. However, if you migrate your application to containers, you will likely invoke the business logic from multiple threads in a single process at the same time.

This section discusses aspects of moving from a single invocation to multiple concurrent invocations within the same process.

Static variables

Static variables are those that are instantiated once and then reused across multiple invocations. Examples of such variables are database connections or configuration information.

For function optimization, and specifically for reducing cold starts and the duration of warm function invocations, we recommend initializing all static variables outside the function handler and storing them in global variables so that further invocations will reuse them.

We recommend using an initialization function that you write as part of the business logic module and that you invoke from outside the handler. This function saves information in global variables that the business logic code reuses across invocations.

The following pseudocode shows the Lambda function:

import business_logic

# Call the initialization code
initialize()

handler(event, context) {
  ...
  # Call the business logic
  ...
}

And the business logic code will look like this:

# Global variables used to store static data
var config

initialize() {
  config = read_Config()
}

do_some_logic(data) {
  # Do something with config object
  ...
}

The same also applies to containers. You will usually initialize static variables when the process starts and not for every single request. When moving to containers, all you need to do is call the initialization function before starting the main application loop.

import business_logic

# Call the initialization code
initialize()

main() {
  while True {
    ...
    # Call the business logic
    ...
  }
}

As you can see, there are no changes in the business logic code.

Database connections

As Lambda functions share nothing between the runtime environments, unlike containers they can’t rely on connection pools when connecting to a relational database. For this reason, we created Amazon RDS Proxy, which acts as a centralized connection pool used by many functions.

To write portable Lambda functions, we recommend using a connection pool object with a single connection. Your business logic code will always ask for a connection from the pool when making a database request. You will still need to use RDS Proxy.

If you later move to containers, you can increase the number of connections in the pool to a larger number with no further changes and the application will scale without overwhelming the database.

File system

Lambda functions come with a writable /tmp folder in the size of 512 MB to 10 GB. As each function instance runs in an isolated runtime environment, developers usually use fixed file names for files stored in that folder. If you run the same business logic code in a container in multiple threads, the different threads will overwrite the files created by others.

We recommended using unique file names in each invocation. Append a UUID or another random number to the file name. Delete the files once you are done with them to avoid running out of space.

If you move your code to containers later, there is nothing to do.

Portable web applications

If you develop a web application, there is another way to achieve portability. You can use the AWS Lambda Web Adapter project to host a web app inside a Lambda function. This way you can develop a web application with familiar frameworks (e.g., Express.js, Next.js, Flask, Spring Boot, Laravel, or anything that uses HTTP 1.1/1.0), and run it on Lambda. If you package your web application as a container, the same Docker image can run on Lambda (using the web adapter) and containers.

Porting from containers to Lambda

This blog post demonstrates how to develop portable Lambda functions you can easily port to containers. Taking these recommendations into consideration can also help develop portable code in general, which allows you to port containers to Lambda functions.

Some things to consider:

  • Separate the business logic from the interface code in the container. The interface code should interact with the event sources and invoke the business logic.
  • As Lambda functions only have a /tmp writable folder, replicate this in your containers (even though you could write to different locations).

Conclusion

This blog post suggests best practices for developing Lambda functions that allow you to gain the benefits of a serverless approach without risking lock-in.

By following these best practices for separating business logic from Lambda handlers, packaging functions as containers, handling Lambda’s single invocation per instance, and more, you can develop portable Lambda functions. As a consequence, you will be able to port your code from Lambda to containers with minimal effort if you choose to move to containers later.

Refer to these best practices and code samples to ease the adoption of a serverless approach when developing your next application.

For more serverless learning resources, visit Serverless Land.

Migrating to token-based authentication for iOS applications with Amazon SNS

Post Syndicated from Pascal Vogel original https://aws.amazon.com/blogs/compute/migrating-to-token-based-authentication-for-ios-applications-with-amazon-sns/

This post is written by Yashlin Naidoo, Cloud Support Engineer.

Amazon Simple Notification Service (Amazon SNS) enables you to send notifications directly to a mobile push endpoint. For iOS apps, Amazon SNS dispatches the notification on your application’s behalf to the Apple Push Notification service (APNs).

To send mobile push notifications via Amazon SNS, you must provide a set of credentials to connect to the APNs (see Prerequisites for Amazon SNS user notifications).

Amazon SNS supports two methods for authenticating with iOS mobile push endpoints when sending a mobile push notification via the APNs:

  • Certificate-based authentication
  • Token-based authentication

To use certificate-based authentication, you must configure Amazon SNS with a provider certificate. Amazon SNS will use this certificate on your behalf to establish a secure connection with the APNs to dispatch your mobile push notifications. For each application that you support, you will need to provide unique certificates.

As the number of applications you manage grows, you will also need to create and manage an increasing number of certificates. Furthermore, certificates expire yearly, and you must renew them to ensure that Amazon SNS can continue to send mobile push notifications on your behalf. To learn more about how to use certificate-based authentication, see Certificate-based authentication for iOS applications with Amazon SNS on the AWS Compute Blog.

For new and existing iOS applications, we recommend that you use token-based authentication. To learn more about how to use token-based authentication, see Token-Based authentication for iOS applications with Amazon SNS on the AWS Compute Blog.

There are several benefits in using token-based authentication:

  • You can use a single token that is shared among all of your applications.
  • You can remove the need for yearly certificate renewal for certificate-based authentication.
  • You can improve the security of your application by using token-based requests. For these requests, your credentials are never transferred from Amazon SNS to your mobile push notification provider, making the communication less likely to be compromised.

Token-based authentication is the latest authentication method provided by the APNs that improves security for your applications, requires less management effort, and is more efficient. We recommend migrating as soon as possible to ensure the security and ease of operations of your applications.

This blog post provides step-by-step instructions for migrating your iOS application from certificate-based authentication to token-based authentication with Amazon SNS. You will learn how to create a new token using your Apple developer account. Next, you will migrate your platform application to token-based authentication. Finally, you will test your application by sending a test push notification via Amazon SNS to a device to confirm the successful migration.

Prerequisites

  • XCode IDE
  • iOS application with a valid p.12 certificate

Before proceeding with this migration, we recommend to stop sending push notifications to your applications until the migration is complete to avoid any disruptions in your message delivery workloads.

Walkthrough

You can also create a test platform application with token-based authentication to ensure that the Amazon SNS platform application is created successfully. Finally, you can create a device token and send a test push notification to it. Once confirmed that the application works correctly, you can migrate your main platform application to token-based authentication.

Creating a .p8 token to upload to Amazon SNS

  1. Log in to your Apple Developer account.
  2. Choose Certificates, Identifiers & Profiles.
  3. In the Keys section, choose the Add button (+).
  4. Under Register a New Key, for Key Name, type the token key name and tick the box for Apple Push Notifications service (APNs) for the key services.
  5. Select Continue.
  6. In the Register a New Key section, check that all values were entered correctly.
  7. Select Register to register the new token key.
  8. Download your token key. Store it in a safe location, as you can’t download the token key again.

Migrating your platform application from certificate-based authentication to token-based authentication

  1. Navigate to the Amazon SNS console. Expand the Mobile menu and choose Push Notification.
  2. Choose your platform application.
  3. Choose Edit. Under Apple credentials section choose Token:
    1. Under Token, select Choose file to upload the .p8 token key file.
    2. Provide values for signing key ID, team ID and bundle ID. These values can be found in your Apple Developer account. Ensure that your bundle ID is identical to the ID used for this application with certificate-based authentication.
  4. Event notifications – optional: refer to the following guide for enabling event notifications: Mobile app events
  5. Delivery status logging – optional: refer to the following guide for enabling delivery status logging: How do I access Amazon SNS topic delivery logs for push notifications? Find more information on these steps can in the Mobile push notifications best practices.
    Apple credential settings
  6. Choose Save changes. This changes your platform application to token-based authentication.

Testing push notification delivery to your device

In this section, you will test sending a push notification to your device using the Amazon SNS console and the AWS Command Line Interface (AWS CLI).

Amazon SNS console

  1. From the Amazon SNS console, navigate to your platform endpoint and choose Publish message.
  2. For message body, select Custom payload for each delivery protocol to send to the endpoint. This example uses a custom payload that allows you to provide additional APNs headers:
    Custom payload for each delivery model configuration
  3. Choose Publish message.
  4. The push notification is delivered to your device:
    iOS sample notification message

AWS CLI

Note: If you receive errors when running AWS CLI commands, make sure that you’re using the most recent AWS CLI version.
Run the following command. For target-arn, specify your platform application endpoint ARN:

aws sns publish \
    --target-arn arn:aws:sns:us-west-2:191418023309:endpoint/APNS_SANDBOX/computeblogdemo/ba7a35f8-c73d-364f-9edd-5c438add0533 \
    --message '{"APNS_SANDBOX": "{\"aps\":{\"alert\":\"Sample message for iOS development endpoints\"}}"}' \
    --message-attributes '{"AWS.SNS.MOBILE.APNS.PUSH_TYPE":{"DataType":"String","StringValue":"alert"}}' \
    --message-structure json
  1. An output containing a MessageId is shown in case of successful delivery:
    {
        "MessageId": "83ecb3a1-c728-5b7c-96e5-e8417d5cd4f4"
    }
  2. The push notification is delivered to your device:
    iOS sample notification message

Troubleshooting

You might encounter various errors when migrating to token-based authentication. This section explains how to troubleshoot these errors.

If a message is not delivered after publishing it to your platform application endpoint, refer to the Amazon CloudWatch failed logs of your platform application. These logs are named sns/your-aws-region/your-accountID/app/platform_name/application_name/Failure.

Once you have navigated to your platform application’s CloudWatch failed log group, click on one of the log streams based on the time that you published the message. Focus on the following attributes:

  • statusCode: error messages are grouped according to the status code.
  • status : shows whether a message was delivered successfully to the provider or if it failed to deliver.
  • providerResponse: provides the response message from the provider and is only shown in case a message failed to deliver.

We will look through messages that failed to deliver because of the following errors:

InvalidProviderToken

"providerResponse": "{\"reason\":\"InvalidProviderToken\"}",
"statusCode": 403,
"status": "FAILURE"

The cause of this error can be an incorrect token key ID, team ID or if the token is invalid.

To resolve this issue, go to your Apple Developer account and ensure that you are providing the correct token ID, team ID and that your token key exists.

TopicDisallowed

"providerResponse": "{\"reason\":\"TopicDisallowed\"}",
"statusCode": 400,
"status": "FAILURE"

The cause of this error can be an incorrect bundle ID or a device token that was created with the wrong bundle ID.

To resolve this issue, go to your Apple Developer account and navigate to your existing certificate used when migrating to token-based authentication. Confirm the bundle ID assigned to this certificate and ensure you are using the same ID for your platform application and also for your device tokens.

Conclusion

Developers can send mobile push notifications for the APNs using token-based authentication by using a .p8 key to authenticate an Apple device endpoint. This is the recommended authentication method due to improved security and lower management effort by removing the need for annual certificate renewal and by being able to share tokens among multiple applications.

To learn more about APNs token-based authentication with Amazon SNS, visit the Amazon SNS Developer Guide.

For more serverless learning resources, visit Serverless Land.