Tag Archives: Kinesis Data Firehose

Uplevel your data architecture with real- time streaming using Amazon Data Firehose and Snowflake

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/uplevel-your-data-architecture-with-real-time-streaming-using-amazon-data-firehose-and-snowflake/

Today’s fast-paced world demands timely insights and decisions, which is driving the importance of streaming data. Streaming data refers to data that is continuously generated from a variety of sources. The sources of this data, such as clickstream events, change data capture (CDC), application and service logs, and Internet of Things (IoT) data streams are proliferating. Snowflake offers two options to bring streaming data into its platform: Snowpipe and Snowflake Snowpipe Streaming. Snowpipe is suitable for file ingestion (batching) use cases, such as loading large files from Amazon Simple Storage Service (Amazon S3) to Snowflake. Snowpipe Streaming, a newer feature released in March 2023, is suitable for rowset ingestion (streaming) use cases, such as loading a continuous stream of data from Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Before Snowpipe Streaming, AWS customers used Snowpipe for both use cases: file ingestion and rowset ingestion. First, you ingested streaming data to Kinesis Data Streams or Amazon MSK, then used Amazon Data Firehose to aggregate and write streams to Amazon S3, followed by using Snowpipe to load the data into Snowflake. However, this multi-step process can result in delays of up to an hour before data is available for analysis in Snowflake. Moreover, it’s expensive, especially when you have small files that Snowpipe has to upload to the Snowflake customer cluster.

To solve this issue, Amazon Data Firehose now integrates with Snowpipe Streaming, enabling you to capture, transform, and deliver data streams from Kinesis Data Streams, Amazon MSK, and Firehose Direct PUT to Snowflake in seconds at a low cost. With a few clicks on the Amazon Data Firehose console, you can set up a Firehose stream to deliver data to Snowflake. There are no commitments or upfront investments to use Amazon Data Firehose, and you only pay for the amount of data streamed.

Some key features of Amazon Data Firehose include:

  • Fully managed serverless service – You don’t need to manage resources, and Amazon Data Firehose automatically scales to match the throughput of your data source without ongoing administration.
  • Straightforward to use with no code – You don’t need to write applications.
  • Real-time data delivery – You can get data to your destinations quickly and efficiently in seconds.
  • Integration with over 20 AWS services – Seamless integration is available for many AWS services, such as Kinesis Data Streams, Amazon MSK, Amazon VPC Flow Logs, AWS WAF logs, Amazon CloudWatch Logs, Amazon EventBridge, AWS IoT Core, and more.
  • Pay-as-you-go model – You only pay for the data volume that Amazon Data Firehose processes.
  • Connectivity – Amazon Data Firehose can connect to public or private subnets in your VPC.

This post explains how you can bring streaming data from AWS into Snowflake within seconds to perform advanced analytics. We explore common architectures and illustrate how to set up a low-code, serverless, cost-effective solution for low-latency data streaming.

Overview of solution

The following are the steps to implement the solution to stream data from AWS to Snowflake:

  1. Create a Snowflake database, schema, and table.
  2. Create a Kinesis data stream.
  3. Create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination using a secure private link.
  4. To test the setup, generate sample stream data from the Amazon Kinesis Data Generator (KDG) with the Firehose delivery stream as the destination.
  5. Query the Snowflake table to validate the data loaded into Snowflake.

The solution is depicted in the following architecture diagram.

Prerequisites

You should have the following prerequisites:

Create a Snowflake database, schema, and table

Complete the following steps to set up your data in Snowflake:

  • Log in to your Snowflake account and create the database:
    create database adf_snf;

  • Create a schema in the new database:
    create schema adf_snf.kds_blog;

  • Create a table in the new schema:
    create or replace table iot_sensors
    (sensorId number,
    sensorType varchar,
    internetIP varchar,
    connectionTime timestamp_ntz,
    currentTemperature number
    );

Create a Kinesis data stream

Complete the following steps to create your data stream:

  • On the Kinesis Data Streams console, choose Data streams in the navigation pane.
  • Choose Create data stream.
  • For Data stream name, enter a name (for example, KDS-Demo-Stream).
  • Leave the remaining settings as default.
  • Choose Create data stream.

Create a Firehose delivery stream

Complete the following steps to create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination:

  • On the Amazon Data Firehose console, choose Create Firehose stream.
  • For Source, choose Amazon Kinesis Data Streams.
  • For Destination, choose Snowflake.
  • For Kinesis data stream, browse to the data stream you created earlier.
  • For Firehose stream name, leave the default generated name or enter a name of your preference.
  • Under Connection settings, provide the following information to connect Amazon Data Firehose to Snowflake:
    • For Snowflake account URL, enter your Snowflake account URL.
    • For User, enter the user name generated in the prerequisites.
    • For Private key, enter the private key generated in the prerequisites. Make sure the private key is in PKCS8 format. Do not include the PEM header-BEGIN prefix and footer-END suffix as part of the private key. If the key is split across multiple lines, remove the line breaks.
    • For Role, select Use custom Snowflake role and enter the IAM role that has access to write to the database table.

You can connect to Snowflake using public or private connectivity. If you don’t provide a VPC endpoint, the default connectivity mode is public. To allow list Firehose IPs in your Snowflake network policy, refer to Choose Snowflake for Your Destination. If you’re using a private link URL, provide the VPCE ID using SYSTEM$GET_PRIVATELINK_CONFIG:

select SYSTEM$GET_PRIVATELINK_CONFIG();

This function returns a JSON representation of the Snowflake account information necessary to facilitate the self-service configuration of private connectivity to the Snowflake service, as shown in the following screenshot.

  • For this post, we’re using a private link, so for VPCE ID, enter the VPCE ID.
  • Under Database configuration settings, enter your Snowflake database, schema, and table names.
  • In the Backup settings section, for S3 backup bucket, enter the bucket you created as part of the prerequisites.
  • Choose Create Firehose stream.

Alternatively, you can use an AWS CloudFormation template to create the Firehose delivery stream with Snowflake as the destination rather than using the Amazon Data Firehose console.

To use the CloudFormation stack, choose

BDB-4100-CFN-Launch-Stack

Generate sample stream data
Generate sample stream data from the KDG with the Kinesis data stream you created:

{ 
"sensorId": {{random.number(999999999)}}, 
"sensorType": "{{random.arrayElement( ["Thermostat","SmartWaterHeater","HVACTemperatureSensor","WaterPurifier"] )}}", 
"internetIP": "{{internet.ip}}", 
"connectionTime": "{{date.now("YYYY-MM-DDTHH:m:ss")}}", 
"currentTemperature": {{random.number({"min":10,"max":150})}} 
}

Query the Snowflake table

Query the Snowflake table:

select * from adf_snf.kds_blog.iot_sensors;

You can confirm that the data generated by the KDG that was sent to Kinesis Data Streams is loaded into the Snowflake table through Amazon Data Firehose.

Troubleshooting

If data is not loaded into Kinesis Data Steams after the KDG sends data to the Firehose delivery stream, refresh and make sure you are logged in to the KDG.

If you made any changes to the Snowflake destination table definition, recreate the Firehose delivery stream.

Clean up

To avoid incurring future charges, delete the resources you created as part of this exercise if you are not planning to use them further.

Conclusion

Amazon Data Firehose provides a straightforward way to deliver data to Snowpipe Streaming, enabling you to save costs and reduce latency to seconds. To try Amazon Kinesis Firehose with Snowflake, refer to the Amazon Data Firehose with Snowflake as destination lab.


About the Authors

Swapna Bandla is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. Swapna has a passion towards understanding customers data and analytics needs and empowering them to develop cloud-based well-architected solutions. Outside of work, she enjoys spending time with her family.

Mostafa Mansour is a Principal Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Firehose. He specializes in developing intuitive product experiences that solve complex challenges for customers at scale. When he’s not hard at work on Amazon Kinesis Data Firehose, you’ll likely find Mostafa on the squash court, where he loves to take on challengers and perfect his dropshots.

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.

Deliver decompressed Amazon CloudWatch Logs to Amazon S3 and Splunk using Amazon Data Firehose

Post Syndicated from Ranjit Kalidasan original https://aws.amazon.com/blogs/big-data/deliver-decompressed-amazon-cloudwatch-logs-to-amazon-s3-and-splunk-using-amazon-data-firehose/

You can use Amazon Data Firehose to aggregate and deliver log events from your applications and services captured in Amazon CloudWatch Logs to your Amazon Simple Storage Service (Amazon S3) bucket and Splunk destinations, for use cases such as data analytics, security analysis, application troubleshooting etc. By default, CloudWatch Logs are delivered as gzip-compressed objects. You might want the data to be decompressed, or want logs to be delivered to Splunk, which requires decompressed data input, for application monitoring and auditing.

AWS released a feature to support decompression of CloudWatch Logs in Firehose. With this new feature, you can specify an option in Firehose to decompress CloudWatch Logs. You no longer have to perform additional processing using AWS Lambda or post-processing to get decompressed logs, and can deliver decompressed data to Splunk. Additionally, you can use optional Firehose features such as record format conversion to convert CloudWatch Logs to Parquet or ORC, and dynamic partitioning to automatically group streaming records based on keys in the data (for example, by month) and deliver the grouped records to corresponding Amazon S3 prefixes.

In this post, we look at how to enable the decompression feature for Splunk and Amazon S3 destinations. We start with Splunk and then Amazon S3 for new streams, then we address migration steps to take advantage of this feature and simplify your existing pipeline.

Decompress CloudWatch Logs for Splunk

You can use subscription filter in CloudWatch log groups to ingest data directly to Firehose or through Amazon Kinesis Data Streams.

Note: For the CloudWatch Logs decompression feature, you need a HTTP Event Collector (HEC) data input created in Splunk, with indexer acknowledgement enabled and the source type. This is required to map to the right source type for the decompressed logs. When creating the HEC input, include the source type mapping (for example, aws:cloudtrail).

To create a Firehose delivery stream for the decompression feature, complete the following steps:

  1. Provide your destination settings and select Raw endpoint as endpoint type.

You can use a raw endpoint for the decompression feature to ingest both raw and JSON-formatted event data to Splunk. For example, VPC Flow Logs data is raw data, and AWS CloudTrail data is in JSON format.

  1. Enter the HEC token for Authentication token.
  2. To enable decompression feature, deselect Transform source records with AWS Lambda under Transform records.
  3. Select Turn on decompression and Turn on message extraction for Decompress source records from Amazon CloudWatch Logs.
  4. Select Turn on message extraction for the Splunk destination.

Message extraction feature

After decompression, CloudWatch Logs are in JSON format, as shown in the following figure. You can see the decompressed data has metadata information such as logGroup, logStream, and subscriptionFilters, and the actual data is included within the message field under logEvents (the following example shows an example of CloudTrail events in the CloudWatch Logs).

When you enable message extraction, Firehose will extract just the contents of the message fields and concatenate the contents with a new line between them, as shown in following figure. With the CloudWatch Logs metadata filtered out with this feature, Splunk will successfully parse the actual log data and map to the source type configured in HEC token.

Additionally, If you want to deliver these CloudWatch events to your Splunk destination in real time, you can use zero buffering, a new feature that was launched recently in Firehose. You can use this feature to set up 0 seconds as the buffer interval or any time interval between 0–60 seconds to deliver data to the Splunk destination in real time within seconds.

With these settings, you can now seamlessly ingest decompressed CloudWatch log data into Splunk using Firehose.

Decompress CloudWatch Logs for Amazon S3

The CloudWatch Logs decompression feature for an Amazon S3 destination works similar to Splunk, where you can turn off data transformation using Lambda and turn on the decompression and message extraction options. You can use the decompression feature to write the log data as a text file to the Amazon S3 destination or use with other Amazon S3 destination features like record format conversion using Parquet or ORC, or dynamic partitioning to partition the data.

Dynamic partitioning with decompression

For Amazon S3 destination, Firehose supports dynamic partitioning, which enables you to continuously partition streaming data by using keys within data, and then deliver the data grouped by these keys into corresponding Amazon S3 prefixes. This enables you to run high-performance, cost-efficient analytics on streaming data in Amazon S3 using services such as Amazon Athena, Amazon EMR, Amazon Redshift Spectrum, and Amazon QuickSight. Partitioning your data minimizes the amount of data scanned, optimizes performance, and reduces costs of your analytics queries on Amazon S3.

With the new decompression feature, you can perform dynamic partitioning without any Lambda function for mapping the partitioning keys on CloudWatch Logs. You can enable the Inline parsing for JSON option, scan the decompressed log data, and select the partitioning keys. The following screenshot shows an example where inline parsing is enabled for CloudTrail log data with a partitioning schema selected for account ID and AWS Region in the CloudTrail record.

Record format conversion with decompression

For CloudWatch Logs data, you can use the record format conversion feature on decompressed data for Amazon S3 destination. Firehose can convert the input data format from JSON to Apache Parquet or Apache ORC before storing the data in Amazon S3. Parquet and ORC are columnar data formats that save space and enable faster queries compared to row-oriented formats like JSON. You can use the features for record format conversion under the Transform and convert records settings to convert the CloudWatch log data to Parquet or ORC format. The following screenshot shows an example of record format conversion settings for Parquet format using an AWS Glue schema and table for CloudTrail log data. When the dynamic partitioning settings are configured, record format conversion works along with dynamic partitioning to create the files in the output format with a partition folder structure in the target S3 bucket.

Migrate existing delivery streams for decompression

If you want to migrate an existing Firehose stream that uses Lambda for decompression to this new decompression feature of Firehose, refer to the steps outlined in Enabling and disabling decompression.

Pricing

The Firehose decompression feature decompress the data and charges per GB of decompressed data. To understand decompression pricing, refer to Amazon Data Firehose pricing.

Clean up

To avoid incurring future charges, delete the resources you created in the following order:

  1. Delete the CloudWatch Logs subscription filter.
  2. Delete the Firehose delivery stream.
  3. Delete the S3 buckets.

Conclusion

The decompression and message extraction feature of Firehose simplifies delivery of CloudWatch Logs to Amazon S3 and Splunk destinations without requiring any code development or additional processing. For an Amazon S3 destination, you can use Parquet or ORC conversion and dynamic partitioning capabilities on decompressed data.

For more information, refer to the following resources:


About the Authors

Ranjit Kalidasan is a Senior Solutions Architect with Amazon Web Services based in Boston, Massachusetts. He is a Partner Solutions Architect helping security ISV partners co-build and co-market solutions with AWS. He brings over 25 years of experience in information technology helping global customers implement complex solutions for security and analytics. You can connect with Ranjit on LinkedIn.

Phaneendra Vuliyaragoli is a Product Management Lead for Amazon Data Firehose at AWS. In this role, Phaneendra leads the product and go-to-market strategy for Amazon Data Firehose.

Exploring real-time streaming for generative AI Applications

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/exploring-real-time-streaming-for-generative-ai-applications/

Foundation models (FMs) are large machine learning (ML) models trained on a broad spectrum of unlabeled and generalized datasets. FMs, as the name suggests, provide the foundation to build more specialized downstream applications, and are unique in their adaptability. They can perform a wide range of different tasks, such as natural language processing, classifying images, forecasting trends, analyzing sentiment, and answering questions. This scale and general-purpose adaptability are what makes FMs different from traditional ML models. FMs are multimodal; they work with different data types such as text, video, audio, and images. Large language models (LLMs) are a type of FM and are pre-trained on vast amounts of text data and typically have application uses such as text generation, intelligent chatbots, or summarization.

Streaming data facilitates the constant flow of diverse and up-to-date information, enhancing the models’ ability to adapt and generate more accurate, contextually relevant outputs. This dynamic integration of streaming data enables generative AI applications to respond promptly to changing conditions, improving their adaptability and overall performance in various tasks.

To better understand this, imagine a chatbot that helps travelers book their travel. In this scenario, the chatbot needs real-time access to airline inventory, flight status, hotel inventory, latest price changes, and more. This data usually comes from third parties, and developers need to find a way to ingest this data and process the data changes as they happen.

Batch processing is not the best fit in this scenario. When data changes rapidly, processing it in a batch may result in stale data being used by the chatbot, providing inaccurate information to the customer, which impacts the overall customer experience. Stream processing, however, can enable the chatbot to access real-time data and adapt to changes in availability and price, providing the best guidance to the customer and enhancing the customer experience.

Another example is an AI-driven observability and monitoring solution where FMs monitor real-time internal metrics of a system and produces alerts. When the model finds an anomaly or abnormal metric value, it should immediately produce an alert and notify the operator. However, the value of such important data diminishes significantly over time. These notifications should ideally be received within seconds or even while it’s happening. If operators receive these notifications minutes or hours after they happened, such an insight is not actionable and has potentially lost its value. You can find similar use cases in other industries such as retail, car manufacturing, energy, and the financial industry.

In this post, we discuss why data streaming is a crucial component of generative AI applications due to its real-time nature. We discuss the value of AWS data streaming services such as Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Firehose in building generative AI applications.

In-context learning

LLMs are trained with point-in-time data and have no inherent ability to access fresh data at inference time. As new data appears, you will have to continuously fine-tune or further train the model. This is not only an expensive operation, but also very limiting in practice because the rate of new data generation far supersedes the speed of fine-tuning. Additionally, LLMs lack contextual understanding and rely solely on their training data, and are therefore prone to hallucinations. This means they can generate a fluent, coherent, and syntactically sound but factually incorrect response. They are also devoid of relevance, personalization, and context.

LLMs, however, have the capacity to learn from the data they receive from the context to more accurately respond without modifying the model weights. This is called in-context learning, and can be used to produce personalized answers or provide an accurate response in the context of organization policies.

For example, in a chatbot, data events could pertain to an inventory of flights and hotels or price changes that are constantly ingested to a streaming storage engine. Furthermore, data events are filtered, enriched, and transformed to a consumable format using a stream processor. The result is made available to the application by querying the latest snapshot. The snapshot constantly updates through stream processing; therefore, the up-to-date data is provided in the context of a user prompt to the model. This allows the model to adapt to the latest changes in price and availability. The following diagram illustrates a basic in-context learning workflow.

A commonly used in-context learning approach is to use a technique called Retrieval Augmented Generation (RAG). In RAG, you provide the relevant information such as most relevant policy and customer records along with the user question to the prompt. This way, the LLM generates an answer to the user question using additional information provided as context. To learn more about RAG, refer to Question answering using Retrieval Augmented Generation with foundation models in Amazon SageMaker JumpStart.

A RAG-based generative AI application can only produce generic responses based on its training data and the relevant documents in the knowledge base. This solution falls short when a near-real-time personalized response is expected from the application. For example, a travel chatbot is expected to consider the user’s current bookings, available hotel and flight inventory, and more. Moreover, the relevant customer personal data (commonly known as the unified customer profile) is usually subject to change. If a batch process is employed to update the generative AI’s user profile database, the customer may receive dissatisfying responses based on old data.

In this post, we discuss the application of stream processing to enhance a RAG solution used for building question answering agents with context from real-time access to unified customer profiles and organizational knowledge base.

Near-real-time customer profile updates

Customer records are typically distributed across data stores within an organization. For your generative AI application to provide a relevant, accurate, and up-to-date customer profile, it is vital to build streaming data pipelines that can perform identity resolution and profile aggregation across the distributed data stores. Streaming jobs constantly ingest new data to synchronize across systems and can perform enrichment, transformations, joins, and aggregations across windows of time more efficiently. Change data capture (CDC) events contain information about the source record, updates, and metadata such as time, source, classification (insert, update, or delete), and the initiator of the change.

The following diagram illustrates an example workflow for CDC streaming ingestion and processing for unified customer profiles.

In this section, we discuss the main components of a CDC streaming pattern required to support RAG-based generative AI applications.

CDC streaming ingestion

A CDC replicator is a process that collects data changes from a source system (usually by reading transaction logs or binlogs) and writes CDC events with the exact same order they occurred in a streaming data stream or topic. This involves a log-based capture with tools such as AWS Database Migration Service (AWS DMS) or open source connectors such as Debezium for Apache Kafka connect. Apache Kafka Connect is part of the Apache Kafka environment, allowing data to be ingested from various sources and delivered to variety of destinations. You can run your Apache Kafka connector on Amazon MSK Connect within minutes without worrying about configuration, setup, and operating an Apache Kafka cluster. You only need to upload your connector’s compiled code to Amazon Simple Storage Service (Amazon S3) and set up your connector with your workload’s specific configuration.

There are also other methods for capturing data changes. For example, Amazon DynamoDB provides a feature for streaming CDC data to Amazon DynamoDB Streams or Kinesis Data Streams. Amazon S3 provides a trigger to invoke an AWS Lambda function when a new document is stored.

Streaming storage

Streaming storage functions as an intermediate buffer to store CDC events before they get processed. Streaming storage provides reliable storage for streaming data. By design, it is highly available and resilient to hardware or node failures and maintains the order of the events as they are written. Streaming storage can store data events either permanently or for a set period of time. This allows stream processors to read from part of the stream if there is a failure or a need for re-processing. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at scale. Amazon MSK is a fully managed, highly available, and secure service provided by AWS for running Apache Kafka.

Stream processing

Stream processing systems should be designed for parallelism to handle high data throughput. They should partition the input stream between multiple tasks running on multiple compute nodes. Tasks should be able to send the result of one operation to the next one over the network, making it possible for processing data in parallel while performing operations such as joins, filtering, enrichment, and aggregations. Stream processing applications should be able to process events with regards to the event time for use cases where events could arrive late or correct computation relies on the time events occur rather than the system time. For more information, refer to Notions of Time: Event Time and Processing Time.

Stream processes continuously produce results in the form of data events that need to be output to a target system. A target system could be any system that can integrate directly with the process or via streaming storage as in intermediary. Depending on the framework you choose for stream processing, you will have different options for target systems depending on available sink connectors. If you decide to write the results to an intermediary streaming storage, you can build a separate process that reads events and applies changes to the target system, such as running an Apache Kafka sink connector. Regardless of which option you choose, CDC data needs extra handling due to its nature. Because CDC events carry information about updates or deletes, it’s important that they merge in the target system in the right order. If changes are applied in the wrong order, the target system will be out of sync with its source.

Apache Flink is a powerful stream processing framework known for its low latency and high throughput capabilities. It supports event time processing, exactly-once processing semantics, and high fault tolerance. Additionally, it provides native support for CDC data via a special structure called dynamic tables. Dynamic tables mimic the source database tables and provide a columnar representation of the streaming data. The data in dynamic tables changes with every event that is processed. New records can be appended, updated, or deleted at any time. Dynamic tables abstract away the extra logic you need to implement for each record operation (insert, update, delete) separately. For more information, refer to Dynamic Tables.

With Amazon Managed Service for Apache Flink, you can run Apache Flink jobs and integrate with other AWS services. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up.

AWS Glue is a fully managed extract, transform, and load (ETL) service, which means AWS handles the infrastructure provisioning, scaling, and maintenance for you. Although it’s primarily known for its ETL capabilities, AWS Glue can also be used for Spark streaming applications. AWS Glue can interact with streaming data services such as Kinesis Data Streams and Amazon MSK for processing and transforming CDC data. AWS Glue can also seamlessly integrate with other AWS services such as Lambda, AWS Step Functions, and DynamoDB, providing you with a comprehensive ecosystem for building and managing data processing pipelines.

Unified customer profile

Overcoming the unification of the customer profile across a variety of source systems requires the development of robust data pipelines. You need data pipelines that can bring and synchronize all records into one data store. This data store provides your organization with the holistic customer records view that is needed for operational efficiency of RAG-based generative AI applications. For building such a data store, an unstructured data store would be best.

An identity graph is a useful structure for creating a unified customer profile because it consolidates and integrates customer data from various sources, ensures data accuracy and deduplication, offers real-time updates, connects cross-systems insights, enables personalization, enhances customer experience, and supports regulatory compliance. This unified customer profile empowers the generative AI application to understand and engage with customers effectively, and adhere to data privacy regulations, ultimately enhancing customer experiences and driving business growth. You can build your identity graph solution using Amazon Neptune, a fast, reliable, fully managed graph database service.

AWS provides a few other managed and serverless NoSQL storage service offerings for unstructured key-value objects. Amazon DocumentDB (with MongoDB compatibility) is a fast, scalable, highly available, and fully managed enterprise document database service that supports native JSON workloads. DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability.

Near-real-time organizational knowledge base updates

Similar to customer records, internal knowledge repositories such as company policies and organizational documents are siloed across storage systems. This is typically unstructured data and is updated in a non-incremental fashion. The use of unstructured data for AI applications is effective using vector embeddings, which is a technique of representing high dimensional data such as text files, images, and audio files as multi-dimensional numeric.

AWS provides several vector engine services, such as Amazon OpenSearch Serverless, Amazon Kendra, and Amazon Aurora PostgreSQL-Compatible Edition with the pgvector extension for storing vector embeddings. Generative AI applications can enhance the user experience by transforming the user prompt into a vector and use it to query the vector engine to retrieve contextually relevant information. Both the prompt and the vector data retrieved are then passed to the LLM to receive a more precise and personalized response.

The following diagram illustrates an example stream-processing workflow for vector embeddings.

Knowledge base contents need to be converted to vector embeddings before being written to the vector data store. Amazon Bedrock or Amazon SageMaker can help you access the model of your choice and expose a private endpoint for this conversion. Furthermore, you can use libraries such as LangChain to integrate with these endpoints. Building a batch process can help you convert your knowledge base content to vector data and store it in a vector database initially. However, you need to rely on an interval to reprocess the documents to synchronize your vector database with changes in your knowledge base content. With a large number of documents, this process can be inefficient. Between these intervals, your generative AI application users will receive answers according to the old content, or will receive an inaccurate answer because the new content is not vectorized yet.

Stream processing is an ideal solution for these challenges. It produces events as per existing documents initially and further monitors the source system and creates a document change event as soon as they occur. These events can be stored in streaming storage and wait to be processed by a streaming job. A streaming job reads these events, loads the content of the document, and transforms the contents to an array of related tokens of words. Each token further transforms into vector data via an API call to an embedding FM. Results are sent for storage to the vector storage via a sink operator.

If you’re using Amazon S3 for storing your documents, you can build an event-source architecture based on S3 object change triggers for Lambda. A Lambda function can create an event in the desired format and write that to your streaming storage.

You can also use Apache Flink to run as a streaming job. Apache Flink provides the native FileSystem source connector, which can discover existing files and read their contents initially. After that, it can continuously monitor your file system for new files and capture their content. The connector supports reading a set of files from distributed file systems such as Amazon S3 or HDFS with a format of plain text, Avro, CSV, Parquet, and more, and produces a streaming record. As a fully managed service, Managed Service for Apache Flink removes the operational overhead of deploying and maintaining Flink jobs, allowing you to focus on building and scaling your streaming applications. With seamless integration into the AWS streaming services such as Amazon MSK or Kinesis Data Streams, it provides features like automatic scaling, security, and resiliency, providing reliable and efficient Flink applications for handling real-time streaming data.

Based on your DevOps preference, you can choose between Kinesis Data Streams or Amazon MSK for storing the streaming records. Kinesis Data Streams simplifies the complexities of building and managing custom streaming data applications, allowing you to focus on deriving insights from your data rather than infrastructure maintenance. Customers using Apache Kafka often opt for Amazon MSK due to its straightforwardness, scalability, and dependability in overseeing Apache Kafka clusters within the AWS environment. As a fully managed service, Amazon MSK takes on the operational complexities associated with deploying and maintaining Apache Kafka clusters, enabling you to concentrate on constructing and expanding your streaming applications.

Because a RESTful API integration suits the nature of this process, you need a framework that supports a stateful enrichment pattern via RESTful API calls to track for failures and retry for the failed request. Apache Flink again is a framework that can do stateful operations in at-memory speed. To understand the best ways to make API calls via Apache Flink, refer to Common streaming data enrichment patterns in Amazon Kinesis Data Analytics for Apache Flink.

Apache Flink provides native sink connectors for writing data to vector datastores such as Amazon Aurora for PostgreSQL with pgvector or Amazon OpenSearch Service with VectorDB. Alternatively, you can stage the Flink job’s output (vectorized data) in an MSK topic or a Kinesis data stream. OpenSearch Service provides support for native ingestion from Kinesis data streams or MSK topics. For more information, refer to Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion and Loading streaming data from Amazon Kinesis Data Streams.

Feedback analytics and fine-tuning

It’s important for data operation managers and AI/ML developers to get insight about the performance of the generative AI application and the FMs in use. To achieve that, you need to build data pipelines that calculate important key performance indicator (KPI) data based on the user feedback and variety of application logs and metrics. This information is useful for stakeholders to gain real-time insight about the performance of the FM, the application, and overall user satisfaction about the quality of support they receive from your application. You also need to collect and store the conversation history for further fine-tuning your FMs to improve their ability in performing domain-specific tasks.

This use case fits very well in the streaming analytics domain. Your application should store each conversation in streaming storage. Your application can prompt users about their rating of each answer’s accuracy and their overall satisfaction. This data can be in a format of a binary choice or a free form text. This data can be stored in a Kinesis data stream or MSK topic, and get processed to generate KPIs in real time. You can put FMs to work for users’ sentiment analysis. FMs can analyze each answer and assign a category of user satisfaction.

Apache Flink’s architecture allows for complex data aggregation over windows of time. It also provides support for SQL querying over stream of data events. Therefore, by using Apache Flink, you can quickly analyze raw user inputs and generate KPIs in real time by writing familiar SQL queries. For more information, refer to Table API & SQL.

With Amazon Managed Service for Apache Flink Studio, you can build and run Apache Flink stream processing applications using standard SQL, Python, and Scala in an interactive notebook. Studio notebooks are powered by Apache Zeppelin and use Apache Flink as the stream processing engine. Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. With support for user-defined functions (UDFs), Apache Flink allows for building custom operators to integrate with external resources such as FMs for performing complex tasks such as sentiment analysis. You can use UDFs to compute various metrics or enrich user feedback raw data with additional insights such as user sentiment. To learn more about this pattern, refer to Proactively addressing customer concern in real-time with GenAI, Flink, Apache Kafka, and Kinesis.

With Managed Service for Apache Flink Studio, you can deploy your Studio notebook as a streaming job with one click. You can use native sink connectors provided by Apache Flink to send the output to your storage of choice or stage it in a Kinesis data stream or MSK topic. Amazon Redshift and OpenSearch Service are both ideal for storing analytical data. Both engines provide native ingestion support from Kinesis Data Streams and Amazon MSK via a separate streaming pipeline to a data lake or data warehouse for analysis.

Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses and data lakes, using AWS-designed hardware and machine learning to deliver the best price-performance at scale. OpenSearch Service offers visualization capabilities powered by OpenSearch Dashboards and Kibana (1.5 to 7.10 versions).

You can use the outcome of such analysis combined with user prompt data for fine-tuning the FM when is needed. SageMaker is the most straightforward way to fine-tune your FMs. Using Amazon S3 with SageMaker provides a powerful and seamless integration for fine-tuning your models. Amazon S3 serves as a scalable and durable object storage solution, enabling straightforward storage and retrieval of large datasets, training data, and model artifacts. SageMaker is a fully managed ML service that simplifies the entire ML lifecycle. By using Amazon S3 as the storage backend for SageMaker, you can benefit from the scalability, reliability, and cost-effectiveness of Amazon S3, while seamlessly integrating it with SageMaker training and deployment capabilities. This combination enables efficient data management, facilitates collaborative model development, and makes sure that ML workflows are streamlined and scalable, ultimately enhancing the overall agility and performance of the ML process. For more information, refer to Fine-tune Falcon 7B and other LLMs on Amazon SageMaker with @remote decorator.

With a file system sink connector, Apache Flink jobs can deliver data to Amazon S3 in open format (such as JSON, Avro, Parquet, and more) files as data objects. If you prefer to manage your data lake using a transactional data lake framework (such as Apache Hudi, Apache Iceberg, or Delta Lake), all of these frameworks provide a custom connector for Apache Flink. For more details, refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi.

Summary

For a generative AI application based on a RAG model, you need to consider building two data storage systems, and you need to build data operations that keep them up to date with all the source systems. Traditional batch jobs are not sufficient to process the size and diversity of the data you need to integrate with your generative AI application. Delays in processing the changes in source systems result in an inaccurate response and reduce the efficiency of your generative AI application. Data streaming enables you to ingest data from a variety of databases across various systems. It also allows you to transform, enrich, join, and aggregate data across many sources efficiently in near-real time. Data streaming provides a simplified data architecture to collect and transform users’ real-time reactions or comments on the application responses, helping you deliver and store the results in a data lake for model fine-tuning. Data streaming also helps you optimize data pipelines by processing only the change events, allowing you to respond to data changes more quickly and efficiently.

Learn more about AWS data streaming services and get started building your own data streaming solution.


About the Authors

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Imtiaz (Taz) Sayed is the World-Wide Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Build an end-to-end serverless streaming pipeline with Apache Kafka on Amazon MSK using Python

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-serverless-streaming-pipeline-with-apache-kafka-on-amazon-msk-using-python/

The volume of data generated globally continues to surge, from gaming, retail, and finance, to manufacturing, healthcare, and travel. Organizations are looking for more ways to quickly use the constant inflow of data to innovate for their businesses and customers. They have to reliably capture, process, analyze, and load the data into a myriad of data stores, all in real time.

Apache Kafka is a popular choice for these real-time streaming needs. However, it can be challenging to set up a Kafka cluster along with other data processing components that scale automatically depending on your application’s needs. You risk under-provisioning for peak traffic, which can lead to downtime, or over-provisioning for base load, leading to wastage. AWS offers multiple serverless services like Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Data Firehose, Amazon DynamoDB, and AWS Lambda that scale automatically depending on your needs.

In this post, we explain how you can use some of these services, including MSK Serverless, to build a serverless data platform to meet your real-time needs.

Solution overview

Let’s imagine a scenario. You’re responsible for managing thousands of modems for an internet service provider deployed across multiple geographies. You want to monitor the modem connectivity quality that has a significant impact on customer productivity and satisfaction. Your deployment includes different modems that need to be monitored and maintained to ensure minimal downtime. Each device transmits thousands of 1 KB records every second, such as CPU usage, memory usage, alarm, and connection status. You want real-time access to this data so you can monitor performance in real time, and detect and mitigate issues quickly. You also need longer-term access to this data for machine learning (ML) models to run predictive maintenance assessments, find optimization opportunities, and forecast demand.

Your clients that gather the data onsite are written in Python, and they can send all the data as Apache Kafka topics to Amazon MSK. For your application’s low-latency and real-time data access, you can use Lambda and DynamoDB. For longer-term data storage, you can use managed serverless connector service Amazon Data Firehose to send data to your data lake.

The following diagram shows how you can build this end-to-end serverless application.

end-to-end serverless application

Let’s follow the steps in the following sections to implement this architecture.

Create a serverless Kafka cluster on Amazon MSK

We use Amazon MSK to ingest real-time telemetry data from modems. Creating a serverless Kafka cluster is straightforward on Amazon MSK. It only takes a few minutes using the AWS Management Console or AWS SDK. To use the console, refer to Getting started using MSK Serverless clusters. You create a serverless cluster, AWS Identity and Access Management (IAM) role, and client machine.

Create a Kafka topic using Python

When your cluster and client machine are ready, SSH to your client machine and install Kafka Python and the MSK IAM library for Python.

  • Run the following commands to install Kafka Python and the MSK IAM library:
pip install kafka-python

pip install aws-msk-iam-sasl-signer-python
  • Create a new file called createTopic.py.
  • Copy the following code into this file, replacing the bootstrap_servers and region information with the details for your cluster. For instructions on retrieving the bootstrap_servers information for your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS region where MSK cluster is located
region= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to provide MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

# Create an instance of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers='<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create topic
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Topic has been created")
else:
    print("topic already exists!. List of topics are:" + str(existing_topics))
  • Run the createTopic.py script to create a new Kafka topic called mytopic on your serverless cluster:
python createTopic.py

Produce records using Python

Let’s generate some sample modem telemetry data.

  • Create a new file called kafkaDataGen.py.
  • Copy the following code into this file, updating the BROKERS and region information with the details for your cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname='mytopic'

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
region= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Method to get a random model name
def getModel():
    products=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (products[randomnum])

# Method to get a random interface status
def getInterfaceStatus():
    status=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (status[randomnum])

# Method to get a random CPU usage
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Method to get a random memory usage
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Method to generate sample data
def generateData():
    
    model=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface='eth4.1'
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=model
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Continuously generate and send data
while True:
    data =generateData()
    print(data)
    try:
        future = producer.send(topicname, value=data)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    except Exception as e:
        print(e.with_traceback())
  • Run the kafkaDataGen.py to continuously generate random data and publish it to the specified Kafka topic:
python kafkaDataGen.py

Store events in Amazon S3

Now you store all the raw event data in an Amazon Simple Storage Service (Amazon S3) data lake for analytics. You can use the same data to train ML models. The integration with Amazon Data Firehose allows Amazon MSK to seamlessly load data from your Apache Kafka clusters into an S3 data lake. Complete the following steps to continuously stream data from Kafka to Amazon S3, eliminating the need to build or manage your own connector applications:

  • On the Amazon S3 console, create a new bucket. You can also use an existing bucket.
  • Create a new folder in your S3 bucket called streamingDataLake.
  • On the Amazon MSK console, choose your MSK Serverless cluster.
  • On the Actions menu, choose Edit cluster policy.

cluster policy

  • Select Include Firehose service principal and choose Save changes.

firehose service principal

  • On the S3 delivery tab, choose Create delivery stream.

delivery stream

  • For Source, choose Amazon MSK.
  • For Destination, choose Amazon S3.

source and destination

  • For Amazon MSK cluster connectivity, select Private bootstrap brokers.
  • For Topic, enter a topic name (for this post, mytopic).

source settings

  • For S3 bucket, choose Browse and choose your S3 bucket.
  • Enter streamingDataLake as your S3 bucket prefix.
  • Enter streamingDataLakeErr as your S3 bucket error output prefix.

destination settings

  • Choose Create delivery stream.

create delivery stream

You can verify that the data was written to your S3 bucket. You should see that the streamingDataLake directory was created and the files are stored in partitions.

amazon s3

Store events in DynamoDB

For the last step, you store the most recent modem data in DynamoDB. This allows the client application to access the modem status and interact with the modem remotely from anywhere, with low latency and high availability. Lambda seamlessly works with Amazon MSK. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. Lambda reads the messages in batches and provides these to your function as an event payload.

Lets first create a table in DynamoDB. Refer to DynamoDB API permissions: Actions, resources, and conditions reference to verify that your client machine has the necessary permissions.

  • Create a new file called createTable.py.
  • Copy the following code into the file, updating the region information:
import boto3
region='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.client('dynamodb', region_name=region)
table_name = 'device_status'
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the table with on-demand capacity mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode='PAY_PER_REQUEST'
)
print(f"Table '{table_name}' created with on-demand capacity mode.")
  • Run the createTable.py script to create a table called device_status in DynamoDB:
python createTable.py

Now let’s configure the Lambda function.

  • On the Lambda console, choose Functions in the navigation pane.
  • Choose Create function.
  • Select Author from scratch.
  • For Function name¸ enter a name (for example, my-notification-kafka).
  • For Runtime, choose Python 3.11.
  • For Permissions, select Use an existing role and choose a role with permissions to read from your cluster.
  • Create the function.

On the Lambda function configuration page, you can now configure sources, destinations, and your application code.

  • Choose Add trigger.
  • For Trigger configuration, enter MSK to configure Amazon MSK as a trigger for the Lambda source function.
  • For MSK cluster, enter myCluster.
  • Deselect Activate trigger, because you haven’t configured your Lambda function yet.
  • For Batch size, enter 100.
  • For Starting position, choose Latest.
  • For Topic name¸ enter a name (for example, mytopic).
  • Choose Add.
  • On the Lambda function details page, on the Code tab, enter the following code:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    try:
        aa=json.loads(payload)
        return aa
    except:
        return 'err'

def lambda_handler(event, context):
    base64records = event['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for record in raw_records:
        item = json.loads(record)
        deviceid=item['deviceid']
        interface=item['interface']
        interfacestatus=item['interfacestatus']
        cpuusage=item['cpuusage']
        memoryusage=item['memoryusage']
        event_time=item['event_time']
        
        dynamodb = boto3.client('dynamodb')
        table_name = 'device_status'
        item = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the item to the DynamoDB table
        response = dynamodb.put_item(
            TableName=table_name,
            Item=item
        )
        
        print(f"Item written to DynamoDB")
  • Deploy the Lambda function.
  • On the Configuration tab, choose Edit to edit the trigger.

edit trigger

  • Select the trigger, then choose Save.
  • On the DynamoDB console, choose Explore items in the navigation pane.
  • Select the table device_status.

You will see Lambda is writing events generated in the Kafka topic to DynamoDB.

ddb table

Summary

Streaming data pipelines are critical for building real-time applications. However, setting up and managing the infrastructure can be daunting. In this post, we walked through how to build a serverless streaming pipeline on AWS using Amazon MSK, Lambda, DynamoDB, Amazon Data Firehose, and other services. The key benefits are no servers to manage, automatic scalability of the infrastructure, and a pay-as-you-go model using fully managed services.

Ready to build your own real-time pipeline? Get started today with a free AWS account. With the power of serverless, you can focus on your application logic while AWS handles the undifferentiated heavy lifting. Let’s build something awesome on AWS!


About the Authors

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Michael Oguike is a Product Manager for Amazon MSK. He is passionate about using data to uncover insights that drive action. He enjoys helping customers from a wide range of industries improve their businesses using data streaming. Michael also loves learning about behavioral science and psychology from books and podcasts.

Gain insights from historical location data using Amazon Location Service and AWS analytics services

Post Syndicated from Alan Peaty original https://aws.amazon.com/blogs/big-data/gain-insights-from-historical-location-data-using-amazon-location-service-and-aws-analytics-services/

Many organizations around the world rely on the use of physical assets, such as vehicles, to deliver a service to their end-customers. By tracking these assets in real time and storing the results, asset owners can derive valuable insights on how their assets are being used to continuously deliver business improvements and plan for future changes. For example, a delivery company operating a fleet of vehicles may need to ascertain the impact from local policy changes outside of their control, such as the announced expansion of an Ultra-Low Emission Zone (ULEZ). By combining historical vehicle location data with information from other sources, the company can devise empirical approaches for better decision-making. For example, the company’s procurement team can use this information to make decisions about which vehicles to prioritize for replacement before policy changes go into effect.

Developers can use the support in Amazon Location Service for publishing device position updates to Amazon EventBridge to build a near-real-time data pipeline that stores locations of tracked assets in Amazon Simple Storage Service (Amazon S3). Additionally, you can use AWS Lambda to enrich incoming location data with data from other sources, such as an Amazon DynamoDB table containing vehicle maintenance details. Then a data analyst can use the geospatial querying capabilities of Amazon Athena to gain insights, such as the number of days their vehicles have operated in the proposed boundaries of an expanded ULEZ. Because vehicles that do not meet ULEZ emissions standards are subjected to a daily charge to operate within the zone, you can use the location data, along with maintenance data such as age of the vehicle, current mileage, and current emissions standards to estimate the amount the company would have to spend on daily fees.

This post shows how you can use Amazon Location, EventBridge, Lambda, Amazon Data Firehose, and Amazon S3 to build a location-aware data pipeline, and use this data to drive meaningful insights using AWS Glue and Athena.

Overview of solution

This is a fully serverless solution for location-based asset management. The solution consists of the following interfaces:

  • IoT or mobile application – A mobile application or an Internet of Things (IoT) device allows the tracking of a company vehicle while it is in use and transmits its current location securely to the data ingestion layer in AWS. The ingestion approach is not in scope of this post. Instead, a Lambda function in our solution simulates sample vehicle journeys and directly updates Amazon Location tracker objects with randomized locations.
  • Data analytics – Business analysts gather operational insights from multiple data sources, including the location data collected from the vehicles. Data analysts are looking for answers to questions such as, “How long did a given vehicle historically spend inside a proposed zone, and how much would the fees have cost had the policy been in place over the past 12 months?”

The following diagram illustrates the solution architecture.
Architecture diagram

The workflow consists of the following key steps:

  1. The tracking functionality of Amazon Location is used to track the vehicle. Using EventBridge integration, filtered positional updates are published to an EventBridge event bus. This solution uses distance-based filtering to reduce costs and jitter. Distanced-based filtering ignores location updates in which devices have moved less than 30 meters (98.4 feet).
  2. Amazon Location device position events arrive on the EventBridge default bus with source: ["aws.geo"] and detail-type: ["Location Device Position Event"]. One rule is created to forward these events to two downstream targets: a Lambda function, and a Firehose delivery stream.
  3. Two different patterns, based on each target, are described in this post to demonstrate different approaches to committing the data to a S3 bucket:
    1. Lambda function – The first approach uses a Lambda function to demonstrate how you can use code in the data pipeline to directly transform the incoming location data. You can modify the Lambda function to fetch additional vehicle information from a separate data store (for example, a DynamoDB table or a Customer Relationship Management system) to enrich the data, before storing the results in an S3 bucket. In this model, the Lambda function is invoked for each incoming event.
    2. Firehose delivery stream – The second approach uses a Firehose delivery stream to buffer and batch the incoming positional updates, before storing them in an S3 bucket without modification. This method uses GZIP compression to optimize storage consumption and query performance. You can also use the data transformation feature of Data Firehose to invoke a Lambda function to perform data transformation in batches.
  4. AWS Glue crawls both S3 bucket paths, populates the AWS Glue database tables based on the inferred schemas, and makes the data available to other analytics applications through the AWS Glue Data Catalog.
  5. Athena is used to run geospatial queries on the location data stored in the S3 buckets. The Data Catalog provides metadata that allows analytics applications using Athena to find, read, and process the location data stored in Amazon S3.
  6. This solution includes a Lambda function that continuously updates the Amazon Location tracker with simulated location data from fictitious journeys. The Lambda function is triggered at regular intervals using a scheduled EventBridge rule.

You can test this solution yourself using the AWS Samples GitHub repository. The repository contains the AWS Serverless Application Model (AWS SAM) template and Lambda code required to try out this solution. Refer to the instructions in the README file for steps on how to provision and decommission this solution.

Visual layouts in some screenshots in this post may look different than those on your AWS Management Console.

Data generation

In this section, we discuss the steps to manually or automatically generate journey data.

Manually generate journey data

You can manually update device positions using the AWS Command Line Interface (AWS CLI) command aws location batch-update-device-position. Replace the tracker-name, device-id, Position, and SampleTime values with your own, and make sure that successive updates are more than 30 meters in distance apart to place an event on the default EventBridge event bus:

aws location batch-update-device-position --tracker-name <tracker-name> --updates "[{\"DeviceId\": \"<device-id>\", \"Position\": [<longitude>, <latitude>], \"SampleTime\": \"<YYYY-MM-DDThh:mm:ssZ>\"}]"

Automatically generate journey data using the simulator

The provided AWS CloudFormation template deploys an EventBridge scheduled rule and an accompanying Lambda function that simulates tracker updates from vehicles. This rule is enabled by default, and runs at a frequency specified by the SimulationIntervalMinutes CloudFormation parameter. The data generation Lambda function updates the Amazon Location tracker with a randomized position offset from the vehicles’ base locations.

Vehicle names and base locations are stored in the vehicles.json file. A vehicle’s starting position is reset each day, and base locations have been chosen to give them the ability to drift in and out of the ULEZ on a given day to provide a realistic journey simulation.

You can disable the rule temporarily by navigating to the scheduled rule details on the EventBridge console. Alternatively, change the parameter State: ENABLED to State: DISABLED for the scheduled rule resource GenerateDevicePositionsScheduleRule in the template.yml file. Rebuild and re-deploy the AWS SAM template for this change to take effect.

Location data pipeline approaches

The configurations outlined in this section are deployed automatically by the provided AWS SAM template. The information in this section is provided to describe the pertinent parts of the solution.

Amazon Location device position events

Amazon Location sends device position update events to EventBridge in the following format:

{
    "version":"0",
    "id":"<event-id>",
    "detail-type":"Location Device Position Event",
    "source":"aws.geo",
    "account":"<account-number>",
    "time":"<YYYY-MM-DDThh:mm:ssZ>",
    "region":"<region>",
    "resources":[
        "arn:aws:geo:<region>:<account-number>:tracker/<tracker-name>"
    ],
    "detail":{
        "EventType":"UPDATE",
        "TrackerName":"<tracker-name>",
        "DeviceId":"<device-id>",
        "SampleTime":"<YYYY-MM-DDThh:mm:ssZ>",
        "ReceivedTime":"<YYYY-MM-DDThh:mm:ss.sssZ>",
        "Position":[
            <longitude>, 
            <latitude>
	]
    }
}

You can optionally specify an input transformation to modify the format and contents of the device position event data before it reaches the target.

Data enrichment using Lambda

Data enrichment in this pattern is facilitated through the invocation of a Lambda function. In this example, we call this function ProcessDevicePosition, and use a Python runtime. A custom transformation is applied in the EventBridge target definition to receive the event data in the following format:

{
    "EventType":<EventType>,
    "TrackerName":<TrackerName>,
    "DeviceId":<DeviceId>,
    "SampleTime":<SampleTime>,
    "ReceivedTime":<ReceivedTime>,
    "Position":[<Longitude>,<Latitude>]
}

You could apply additional transformations, such as the refactoring of Latitude and Longitude data into separate key-value pairs if this is required by the downstream business logic processing the events.

The following code demonstrates the Python application logic that is run by the ProcessDevicePosition Lambda function. Error handling has been skipped in this code snippet for brevity. The full code is available in the GitHub repo.

import json
import os
import uuid
import boto3

# Import environment variables from Lambda function.
bucket_name = os.environ["S3_BUCKET_NAME"]
bucket_prefix = os.environ["S3_BUCKET_LAMBDA_PREFIX"]

s3 = boto3.client("s3")

def lambda_handler(event, context):
    key = "%s/%s/%s-%s.json" % (bucket_prefix,
                                event["DeviceId"],
                                event["SampleTime"],
                                str(uuid.uuid4())
    body = json.dumps(event, separators=(",", ":"))
    body_encoded = body.encode("utf-8")
    s3.put_object(Bucket=bucket_name, Key=key, Body=body_encoded)
    return {
        "statusCode": 200,
        "body": "success"
    }

The preceding code creates an S3 object for each device position event received by EventBridge. The code uses the DeviceId as a prefix to write the objects to the bucket.

You can add additional logic to the preceding Lambda function code to enrich the event data using other sources. The example in the GitHub repo demonstrates enriching the event with data from a DynamoDB vehicle maintenance table.

In addition to the prerequisite AWS Identity and Access Management (IAM) permissions provided by the role AWSBasicLambdaExecutionRole, the ProcessDevicePosition function requires permissions to perform the S3 put_object action and any other actions required by the data enrichment logic. IAM permissions required by the solution are documented in the template.yml file.

{
    "Version":"2012-10-17",
    "Statement":[
        {
            "Action":[
                "s3:ListBucket"
            ],
            "Resource":[
                "arn:aws:s3:::<S3_BUCKET_NAME>"
            ],
            "Effect":"Allow"
        },
        {
            "Action":[
                "s3:PutObject"
            ],
            "Resource":[
                "arn:aws:s3:::<S3_BUCKET_NAME>/<S3_BUCKET_LAMBDA_PREFIX>/*"
            ],
            "Effect":"Allow"
        }
    ]
}

Data pipeline using Amazon Data Firehose

Complete the following steps to create your Firehose delivery stream:

  1. On the Amazon Data Firehose console, choose Firehose streams in the navigation pane.
  2. Choose Create Firehose stream.
  3. For Source, choose as Direct PUT.
  4. For Destination, choose Amazon S3.
  5. For Firehose stream name, enter a name (for this post, ProcessDevicePositionFirehose).
    Create Firehose stream
  6. Configure the destination settings with details about the S3 bucket in which the location data is stored, along with the partitioning strategy:
    1. Use <S3_BUCKET_NAME> and <S3_BUCKET_FIREHOSE_PREFIX> to determine the bucket and object prefixes.
    2. Use DeviceId as an additional prefix to write the objects to the bucket.
  7. Enable Dynamic partitioning and New line delimiter to make sure partitioning is automatic based on DeviceId, and that new line delimiters are added between records in objects that are delivered to Amazon S3.

These are required by AWS Glue to later crawl the data, and for Athena to recognize individual records.
Destination settings for Firehose stream

Create an EventBridge rule and attach targets

The EventBridge rule ProcessDevicePosition defines two targets: the ProcessDevicePosition Lambda function, and the ProcessDevicePositionFirehose delivery stream. Complete the following steps to create the rule and attach targets:

  1. On the EventBridge console, create a new rule.
  2. For Name, enter a name (for this post, ProcessDevicePosition).
  3. For Event bus¸ choose default.
  4. For Rule type¸ select Rule with an event pattern.
    EventBridge rule detail
  5. For Event source, select AWS events or EventBridge partner events.
    EventBridge event source
  6. For Method, select Use pattern form.
  7. In the Event pattern section, specify AWS services as the source, Amazon Location Service as the specific service, and Location Device Position Event as the event type.
    EventBridge creation method
  8. For Target 1, attach the ProcessDevicePosition Lambda function as a target.
    EventBridge target 1
  9. We use Input transformer to customize the event that is committed to the S3 bucket.
    EventBridge target 1 transformer
  10. Configure Input paths map and Input template to organize the payload into the desired format.
    1. The following code is the input paths map:
      {
          EventType: $.detail.EventType
          TrackerName: $.detail.TrackerName
          DeviceId: $.detail.DeviceId
          SampleTime: $.detail.SampleTime
          ReceivedTime: $.detail.ReceivedTime
          Longitude: $.detail.Position[0]
          Latitude: $.detail.Position[1]
      }

    2. The following code is the input template:
      {
          "EventType":<EventType>,
          "TrackerName":<TrackerName>,
          "DeviceId":<DeviceId>,
          "SampleTime":<SampleTime>,
          "ReceivedTime":<ReceivedTime>,
          "Position":[<Longitude>, <Latitude>]
      }

  11. For Target 2, choose the ProcessDevicePositionFirehose delivery stream as a target.
    EventBridge target 2

This target requires an IAM role that allows one or multiple records to be written to the Firehose delivery stream:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "firehose:PutRecord",
                "firehose:PutRecords"
            ],
            "Resource": [
                "arn:aws:firehose:<region>:<account-id>:deliverystream/<delivery-stream-name>"
            ],
            "Effect": "Allow"
        }
    ]
}

Crawl and catalog the data using AWS Glue

After sufficient data has been generated, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Select the crawlers that have been created, location-analytics-glue-crawler-lambda and location-analytics-glue-crawler-firehose.
  3. Choose Run.

The crawlers will automatically classify the data into JSON format, group the records into tables and partitions, and commit associated metadata to the AWS Glue Data Catalog.
Crawlers

  1. When the Last run statuses of both crawlers show as Succeeded, confirm that two tables (lambda and firehose) have been created on the Tables page.

The solution partitions the incoming location data based on the deviceid field. Therefore, as long as there are no new devices or schema changes, the crawlers don’t need to run again. However, if new devices are added, or a different field is used for partitioning, the crawlers need to run again.
Tables

You’re now ready to query the tables using Athena.

Query the data using Athena

Athena is a serverless, interactive analytics service built to analyze unstructured, semi-structured, and structured data where it is hosted. If this is your first time using the Athena console, follow the instructions to set up a query result location in Amazon S3. To query the data with Athena, complete the following steps:

  1. On the Athena console, open the query editor.
  2. For Data source, choose AwsDataCatalog.
  3. For Database, choose location-analytics-glue-database.
  4. On the options menu (three vertical dots), choose Preview Table to query the content of both tables.
    Preview table

The query displays 10 sample positional records currently stored in the table. The following screenshot is an example from previewing the firehose table. The firehose table stores raw, unmodified data from the Amazon Location tracker.
Query results
You can now experiment with geospatial queries.The GeoJSON file for the 2021 London ULEZ expansion is part of the repository, and has already been converted into a query compatible with both Athena tables.

  1. Copy and paste the content from the 1-firehose-athena-ulez-2021-create-view.sql file found in the examples/firehose folder into the query editor.

This query uses the ST_Within geospatial function to determine if a recorded position is inside or outside the ULEZ zone defined by the polygon. A new view called ulezvehicleanalysis_firehose is created with a new column, insidezone, which captures whether the recorded position exists within the zone.

A simple Python utility is provided, which converts the polygon features found in the downloaded GeoJSON file into ST_Polygon strings based on the well-known text format that can be used directly in an Athena query.

  1. Choose Preview View on the ulezvehicleanalysis_firehose view to explore its content.
    Preview view

You can now run queries against this view to gain overarching insights.

  1. Copy and paste the content from the 2-firehose-athena-ulez-2021-query-days-in-zone.sql file found in the examples/firehose folder into the query editor.

This query establishes the total number of days each vehicle has entered ULEZ, and what the expected total charges would be. The query has been parameterized using the ? placeholder character. Parameterized queries allow you to rerun the same query with different parameter values.

  1. Enter the daily fee amount for Parameter 1, then run the query.
    Query editor

The results display each vehicle, the total number of days spent in the proposed ULEZ, and the total charges based on the daily fee you entered.
Query results
You can repeat this exercise using the lambda table. Data in the lambda table is augmented with additional vehicle details present in the vehicle maintenance DynamoDB table at the time it is processed by the Lambda function. The solution supports the following fields:

  • MeetsEmissionStandards (Boolean)
  • Mileage (Number)
  • PurchaseDate (String, in YYYY-MM-DD format)

You can also enrich the new data as it arrives.

  1. On the DynamoDB console, find the vehicle maintenance table under Tables. The table name is provided as output VehicleMaintenanceDynamoTable in the deployed CloudFormation stack.
  2. Choose Explore table items to view the content of the table.
  3. Choose Create item to create a new record for a vehicle.
    Create item
  4. Enter DeviceId (such as vehicle1 as a String), PurchaseDate (such as 2005-10-01 as a String), Mileage (such as 10000 as a Number), and MeetsEmissionStandards (with a value such as False as Boolean).
  5. Choose Create item to create the record.
    Create item
  6. Duplicate the newly created record with additional entries for other vehicles (such as for vehicle2 or vehicle3), modifying the values of the attributes slightly each time.
  7. Rerun the location-analytics-glue-crawler-lambda AWS Glue crawler after new data has been generated to confirm that the update to the schema with new fields is registered.
  8. Copy and paste the content from the 1-lambda-athena-ulez-2021-create-view.sql file found in the examples/lambda folder into the query editor.
  9. Preview the ulezvehicleanalysis_lambda view to confirm that the new columns have been created.

If errors such as Column 'mileage' cannot be resolved are displayed, the data enrichment is not taking place, or the AWS Glue crawler has not yet detected updates to the schema.

If the Preview table option is only returning results from before you created records in the DynamoDB table, return the query results in descending order using sampletime (for example, order by sampletime desc limit 100;).
Query results
Now we focus on the vehicles that don’t currently meet emissions standards, and order the vehicles in descending order based on the mileage per year (calculated using the latest mileage / age of vehicle in years).

  1. Copy and paste the content from the 2-lambda-athena-ulez-2021-query-days-in-zone.sql file found in the examples/lambda folder into the query editor.
    Query results

In this example, we can see that out of our fleet of vehicles, five have been reported as not meeting emission standards. We can also see the vehicles that have accumulated high mileage per year, and the number of days spent in the proposed ULEZ. The fleet operator may now decide to prioritize these vehicles for replacement. Because location data is enriched with the most up-to-date vehicle maintenance data at the time it is ingested, you can further evolve these queries to run over a defined time window. For example, you could factor in mileage changes within the past year.

Due to the dynamic nature of the data enrichment, any new data being committed to Amazon S3, along with the query results, will be altered as and when records are updated in the DynamoDB vehicle maintenance table.

Clean up

Refer to the instructions in the README file to clean up the resources provisioned for this solution.

Conclusion

This post demonstrated how you can use Amazon Location, EventBridge, Lambda, Amazon Data Firehose, and Amazon S3 to build a location-aware data pipeline, and use the collected device position data to drive analytical insights using AWS Glue and Athena. By tracking these assets in real time and storing the results, companies can derive valuable insights on how effectively their fleets are being utilized and better react to changes in the future. You can now explore extending this sample code with your own device tracking data and analytics requirements.


About the Authors

Alan Peaty is a Senior Partner Solutions Architect at AWS. Alan helps Global Systems Integrators (GSIs) and Global Independent Software Vendors (GISVs) solve complex customer challenges using AWS services. Prior to joining AWS, Alan worked as an architect at systems integrators to translate business requirements into technical solutions. Outside of work, Alan is an IoT enthusiast and a keen runner who loves to hit the muddy trails of the English countryside.

Parag Srivastava is a Solutions Architect at AWS, helping enterprise customers with successful cloud adoption and migration. During his professional career, he has been extensively involved in complex digital transformation projects. He is also passionate about building innovative solutions around geospatial aspects of addresses.

AWS Weekly Roundup — Happy Lunar New Year, IaC generator, NFL’s digital athlete, AWS Cloud Clubs, and more — February 12, 2024

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-happy-lunar-new-year-iac-generator-nfls-digital-athlete-aws-cloud-clubs-and-more-february-12-2024/

Happy Lunar New Year! Wishing you a year filled with joy, success, and endless opportunities! May the Year of the Dragon bring uninterrupted connections and limitless growth 🐉 ☁

In case you missed it, here’s outstanding news you need to know as you plan your year in early 2024.

AWS was named as a Leader in the 2023 Magic Quadrant for Strategic Cloud Platform Services. AWS is the longest-running Magic Quadrant Leader, with Gartner naming AWS a Leader for the thirteenth consecutive year. See Sebastian’s blog post to learn more. AWS has been named a Leader for the ninth consecutive year in the 2023 Gartner Magic Quadrant for Cloud Database Management Systems, and we have been positioned highest for ability to execute by providing a comprehensive set of services for your data foundation across all workloads, use cases, and data types. See Rahul Pathak’s blog post to learn more.

AWS also has been named a Leader in data clean room technology according to the IDC MarketScape: Worldwide Data Clean Room Technology 2024 Vendor Assessment (January 2024). This report evaluated data clean room technology vendors for use cases across industries. See the AWS for Industries Blog channel post to learn more.

Last Week’s Launches
Here are some launches that got my attention:

A new Local Zone in Houston, Texas – Local Zones are an AWS infrastructure deployment that places compute, storage, database, and other select services closer to large population, industry, and IT centers where no AWS Region exists. AWS Local Zones are available in the US in 15 other metro areas and globally in an additional 17 metros areas, allowing you to deliver low-latency applications to end users worldwide. You can enable the new Local Zone in Houston (us-east-1-iah-2a) from the Zones tab in the Amazon EC2 console settings.

AWS CloudFormation IaC generator – You can generate a template using AWS resources provisioned in your account that are not already managed by CloudFormation. With this launch, you can onboard workloads to Infrastructure as Code (IaC) in minutes, eliminating weeks of manual effort. You can then leverage the IaC benefits of automation, safety, and scalability for the workloads. Use the template to import resources into CloudFormation or replicate resources in a new account or Region. See the user guide and blog post to learn more.

A new look-and-feel of Amazon Bedrock console – Amazon Bedrock now offers an enhanced console experience with updated UI improves usability, responsiveness, and accessibility with more seamless support for dark mode. To get started with the new experience, visit the Amazon Bedrock console.

2024-bedrock-visual-refresh

One-click WAF integration on ALB – Application Load Balancer (ALB) now supports console integration with AWS WAF that allows you to secure your applications behind ALB with a single click. This integration enables AWS WAF protections as a first line of defense against common web threats for your applications that use ALB. You can use this one-click security protection provided by AWS WAF from the integrated services section of the ALB console for both new and existing load balancers.

Up to 49% price reduction for AWS Fargate Windows containers on Amazon ECS – Windows containers running on Fargate are now billed per second for infrastructure and Windows Server licenses that their containerized application requests. Along with the infrastructure pricing for on-demand, we are also reducing the minimum billing duration for Windows containers to 5 minutes (from 15 minutes) for any Fargate Windows tasks starting February 1st, 2024 (12:00am UTC). The infrastructure pricing and minimum billing period changes will automatically reflect in your monthly AWS bill. For more information on the specific price reductions, see our pricing page.

Introducing Amazon Data Firehose – We are renaming Amazon Kinesis Data Firehose to Amazon Data Firehose. Amazon Data Firehose is the easiest way to capture, transform, and deliver data streams into Amazon S3, Amazon Redshift, Amazon OpenSearch Service, Splunk, Snowflake, and other 3rd party analytics services. The name change is effective in the AWS Management Console, documentations, and product pages.

AWS Transfer Family integrations with Amazon EventBridge – AWS Transfer Family now enables conditional workflows by publishing SFTP, FTPS, and FTP file transfer events in near real-time, SFTP connectors file transfer event notifications, and Applicability Statement 2 (AS2) transfer operations to Amazon EventBridge. You can orchestrate your file transfer and file-processing workflows in AWS using Amazon EventBridge, or any workflow orchestration service of your choice that integrates with these events.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Some other updates and news that you might have missed:

NFL’s digital athlete in the Super Bowl – AWS is working with the National Football League (NFL) to take player health and safety to the next level. Using AI and machine learning, they are creating a precise picture of each player in training, practice, and games. You could see this technology in action, especially with the Super Bowl on the last Sunday!

Amazon’s commiting the responsible AI – On February 7, Amazon joined the U.S. Artificial Intelligence Safety Institute Consortium, established by the National Institute of Standards of Technology (NIST), to further our government and industry collaboration to advance safe and secure artificial intelligence (AI). Amazon will contribute compute credits to help develop tools to evaluate AI safety and help the institute set an interoperable and trusted foundation for responsible AI development and use.

Compliance updates in South Korea – AWS has completed the 2023 South Korea Cloud Service Providers (CSP) Safety Assessment Program, also known as the Regulation on Supervision on Electronic Financial Transactions (RSEFT) Audit Program. AWS is committed to helping our customers adhere to applicable regulations and guidelines, and we help ensure that our financial customers have a hassle-free experience using the cloud. Also, AWS has successfully renewed certification under the Korea Information Security Management System (K-ISMS) standard (effective from December 16, 2023, to December 15, 2026).

Join AWS Cloud Clubs CaptainsAWS Cloud Clubs are student-led user groups for post-secondary level students and independent learners. Interested in founding or co-founding a Cloud Club in your university or region? We are accepting applications from February 5-18, 2024.

Upcoming AWS Events
Check your calendars and sign up for upcoming AWS events:

AWS Innovate AI/ML and Data Edition – Join our free online conference to learn how you and your organization can leverage the latest advances in generative AI. You can register upcoming AWS Innovate Online event that fits your timezone in Asia Pacific & Japan (February 22), EMEA (February 29), and Americas (March 14).

AWS Public Sector events – Join us at the AWS Public Sector Symposium Brussels (March 12) to discover how the AWS Cloud can help you improve resiliency, develop sustainable solutions, and achieve your mission. AWS Public Sector Day London (March 19) gathers professionals from government, healthcare, and education sectors to tackle pressing challenges in United Kingdom public services.

Kicking off AWS Global Summits – AWS Summits are a series of free online and in-person events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Below is a list of available AWS Summit events taking place in April:

You can browse all upcoming AWS-led in-person and virtual events, and developer-focused events such as AWS DevDay.

That’s all for this week. Check back next Monday for another Week in Review!

— Channy

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Run Kinesis Agent on Amazon ECS

Post Syndicated from Buddhike de Silva original https://aws.amazon.com/blogs/big-data/run-kinesis-agent-on-amazon-ecs/

Kinesis Agent is a standalone Java software application that offers a straightforward way to collect and send data to Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose. The agent continuously monitors a set of files and sends new data to the desired destination. The agent handles file rotation, checkpointing, and retry upon failures. It delivers all of your data in a reliable, timely, and simple manner. It also emits Amazon CloudWatch metrics to help you better monitor and troubleshoot the streaming process.

This post describes the steps to send data from a containerized application to Kinesis Data Firehose using Kinesis Agent. More specifically, we show how to run Kinesis Agent as a sidecar container for an application running in Amazon Elastic Container Service (Amazon ECS). After the data is in Kinesis Data Firehose, it can be sent to any supported destination, such as Amazon Simple Storage Service (Amazon S3).

In order to present the key points required for this setup, we assume that you are familiar with Amazon ECS and working with containers. We also avoid the implementation details and packaging process of our test data generation application, referred to as the producer.

Solution overview

As depicted in the following figure, we configure a Kinesis Agent container as a sidecar that can read files created by the producer container. In this instance, the producer and Kinesis Agent containers share data via a bind mount in Amazon ECS.

Solution design diagram

Prerequisites

You should satisfy the following prerequisites for the successful completion of this task:

With these prerequisites in place, you can begin next step to package a Kinesis Agent and your desired agent configuration as a container in your local development machine.

Create a Kinesis Agent configuration file

We use the Kinesis Agent configuration file to configure the source and destination, among other data transfer settings. The following code uses the minimal configuration required to read the contents of files matching /var/log/producer/*.log and publish them to a Kinesis Data Firehose delivery stream called kinesis-agent-demo:

{
    "firehose.endpoint": "firehose.ap-southeast-2.amazonaws.com",
    "flows": [
        {
            "deliveryStream": "kinesis-agent-demo",
            "filePattern": "/var/log/producer/*.log"
        }
    ]
}

Create a container image for Kinesis Agent

To deploy Kinesis Agent as a sidecar in Amazon ECS, you first have to package it as a container image. The container must have Kinesis Agent, which and find binaries, and the Kinesis Agent configuration file that you prepared earlier. Its entry point must be configured using the start-aws-kinesis-agent script. This command is installed when you run the yum install aws-kinesis-agent step. The resulting Dockerfile should look as follows:

FROM amazonlinux

RUN yum install -y aws-kinesis-agent which findutils
COPY agent.json /etc/aws-kinesis/agent.json

CMD ["start-aws-kinesis-agent"]

Run the docker build command to build this container:

docker build -t kinesis-agent .

After the image is built, it should be pushed to a container registry like Amazon ECR so that you can reference it in the next section.

Create an ECS task definition with Kinesis Agent and the application container

Now that you have Kinesis Agent packaged as a container image, you can use it in your ECS task definitions to run as sidecar. To do that, you create an ECS task definition with your application container (called producer) and Kinesis Agent container. All containers in a task definition are scheduled on the same container host and therefore can share resources such as bind mounts.

In the following sample container definition, we use a bind mount called logs_dir to share a directory between the producer container and kinesis-agent container.

You can use the following template as a starting point, but be sure to change taskRoleArn and executionRoleArn to valid IAM roles in your AWS account. In this instance, the IAM role used for taskRoleArn must have write permissions to Kinesis Data Firehose that you specified earlier in the agent.json file. Additionally, make sure that the ECR image paths and awslogs-region are modified as per your AWS account.

{
    "family": "kinesis-agent-demo",
    "taskRoleArn": "arn:aws:iam::111111111:role/kinesis-agent-demo-task-role",
    "executionRoleArn": "arn:aws:iam::111111111:role/kinesis-agent-test",
    "networkMode": "awsvpc",
    "containerDefinitions": [
        {
            "name": "producer",
            "image": "111111111.dkr.ecr.ap-southeast-2.amazonaws.com/producer:latest",
            "cpu": 1024,
            "memory": 2048,
            "essential": true,
            "command": [
                "-output",
                "/var/log/producer/test.log"
            ],
            "mountPoints": [
                {
                    "sourceVolume": "logs_dir",
                    "containerPath": "/var/log/producer",
                    "readOnly": false
                }
            ],
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-create-group": "true",
                    "awslogs-group": "producer",
                    "awslogs-stream-prefix": "producer",
                    "awslogs-region": "ap-southeast-2"
                }
            }
        },
        {
            "name": "kinesis-agent",
            "image": "111111111.dkr.ecr.ap-southeast-2.amazonaws.com/kinesis-agent:latest",
            "cpu": 1024,
            "memory": 2048,
            "essential": true,
            "mountPoints": [
                {
                    "sourceVolume": "logs_dir",
                    "containerPath": "/var/log/producer",
                    "readOnly": true
                }
            ],
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-create-group": "true",
                    "awslogs-group": "kinesis-agent",
                    "awslogs-stream-prefix": "kinesis-agent",
                    "awslogs-region": "ap-southeast-2"
                }
            }
        }
    ],
    "volumes": [
        {
            "name": "logs_dir"
        }
    ],
    "requiresCompatibilities": [
        "FARGATE"
    ],
    "cpu": "2048",
    "memory": "4096"
}

Register the task definition with the following command:

aws ecs register-task-definition --cli-input-json file://./task-definition.json

Run a new ECS task

Finally, you can run a new ECS task using the task definition you just created using the aws ecs run-task command. When the task is started, you should be able to see two containers running under that task on the Amazon ECS console.

Amazon ECS console screenshot

Conclusion

This post showed how straightforward it is to run Kinesis Agent in a containerized environment. Although we used Amazon ECS as our container orchestration service in this post, you can use a Kinesis Agent container in other environments such as Amazon Elastic Kubernetes Service (Amazon EKS).

To learn more about using Kinesis Agent, refer to Writing to Amazon Kinesis Data Streams Using Kinesis Agent. For more information about Amazon ECS, refer to the Amazon ECS Developer Guide.


About the Author

Buddhike de Silva is a Senior Specialist Solutions Architect at Amazon Web Services. Buddhike helps customers run large scale streaming analytics workloads on AWS and make the best out of their cloud journey.

Amazon MSK Introduces Managed Data Delivery from Apache Kafka to Your Data Lake

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/amazon-msk-introduces-managed-data-delivery-from-apache-kafka-to-your-data-lake/

I’m excited to announce today a new capability of Amazon Managed Streaming for Apache Kafka (Amazon MSK) that allows you to continuously load data from an Apache Kafka cluster to Amazon Simple Storage Service (Amazon S3). We use Amazon Kinesis Data Firehose—an extract, transform, and load (ETL) service—to read data from a Kafka topic, transform the records, and write them to an Amazon S3 destination. Kinesis Data Firehose is entirely managed and you can configure it with just a few clicks in the console. No code or infrastructure is needed.

Kafka is commonly used for building real-time data pipelines that reliably move massive amounts of data between systems or applications. It provides a highly scalable and fault-tolerant publish-subscribe messaging system. Many AWS customers have adopted Kafka to capture streaming data such as click-stream events, transactions, IoT events, and application and machine logs, and have applications that perform real-time analytics, run continuous transformations, and distribute this data to data lakes and databases in real time.

However, deploying Kafka clusters is not without challenges.

The first challenge is to deploy, configure, and maintain the Kafka cluster itself. This is why we released Amazon MSK in May 2019. MSK reduces the work needed to set up, scale, and manage Apache Kafka in production. We take care of the infrastructure, freeing you to focus on your data and applications. The second challenge is to write, deploy, and manage application code that consumes data from Kafka. It typically requires coding connectors using the Kafka Connect framework and then deploying, managing, and maintaining a scalable infrastructure to run the connectors. In addition to the infrastructure, you also must code the data transformation and compression logic, manage the eventual errors, and code the retry logic to ensure no data is lost during the transfer out of Kafka.

Today, we announce the availability of a fully managed solution to deliver data from Amazon MSK to Amazon S3 using Amazon Kinesis Data Firehose. The solution is serverless–there is no server infrastructure to manage–and requires no code. The data transformation and error-handling logic can be configured with a few clicks in the console.

The architecture of the solution is illustrated by the following diagram.

Amazon MSK to Amazon S3 architecture diagram

Amazon MSK is the data source, and Amazon S3 is the data destination while Amazon Kinesis Data Firehose manages the data transfer logic.

When using this new capability, you no longer need to develop code to read your data from Amazon MSK, transform it, and write the resulting records to Amazon S3. Kinesis Data Firehose manages the reading, the transformation and compression, and the write operations to Amazon S3. It also handles the error and retry logic in case something goes wrong. The system delivers the records that can not be processed to the S3 bucket of your choice for manual inspection. The system also manages the infrastructure required to handle the data stream. It will scale out and scale in automatically to adjust to the volume of data to transfer. There are no provisioning or maintenance operations required on your side.

Kinesis Data Firehose delivery streams support both public and private Amazon MSK provisioned or serverless clusters. It also supports cross-account connections to read from an MSK cluster and to write to S3 buckets in different AWS accounts. The Data Firehose delivery stream reads data from your MSK cluster, buffers the data for a configurable threshold size and time, and then writes the buffered data to Amazon S3 as a single file. MSK and Data Firehose must be in the same AWS Region, but Data Firehose can deliver data to Amazon S3 buckets in other Regions.

Kinesis Data Firehose delivery streams can also convert data types. It has built-in transformations to support JSON to Apache Parquet and Apache ORC formats. These are columnar data formats that save space and enable faster queries on Amazon S3. For non-JSON data, you can use AWS Lambda to transform input formats such as CSV, XML, or structured text into JSON before converting the data to Apache Parquet/ORC. Additionally, you can specify data compression formats from Data Firehose, such as GZIP, ZIP, and SNAPPY, before delivering the data to Amazon S3, or you can deliver the data to Amazon S3 in its raw form.

Let’s See How It Works
To get started, I use an AWS account where there’s an Amazon MSK cluster already configured and some applications streaming data to it. To get started and to create your first Amazon MSK cluster, I encourage you to read the tutorial.

Amazon MSK - List of existing clusters

For this demo, I use the console to create and configure the data delivery stream. Alternatively, I can use the AWS Command Line Interface (AWS CLI), AWS SDKs, AWS CloudFormation, or Terraform.

I navigate to the Amazon Kinesis Data Firehose page of the AWS Management Console and then choose Create delivery stream.

Kinesis Data Firehose - Main console page

I select Amazon MSK as a data Source and Amazon S3 as a delivery Destination. For this demo, I want to connect to a private cluster, so I select Private bootstrap brokers under Amazon MSK cluster connectivity.

I need to enter the full ARN of my cluster. Like most people, I cannot remember the ARN, so I choose Browse and select my cluster from the list.

Finally, I enter the cluster Topic name I want this delivery stream to read from.

Configure the delivery stream

After the source is configured, I scroll down the page to configure the data transformation section.

On the Transform and convert records section, I can choose whether I want to provide my own Lambda function to transform records that aren’t in JSON or to transform my source JSON records to one of the two available pre-built destination data formats: Apache Parquet or Apache ORC.

Apache Parquet and ORC formats are more efficient than JSON format to query data from Amazon S3. You can select these destination data formats when your source records are in JSON format. You must also provide a data schema from a table in AWS Glue.

These built-in transformations optimize your Amazon S3 cost and reduce time-to-insights when downstream analytics queries are performed with Amazon Athena, Amazon Redshift Spectrum, or other systems.

Configure the data transformation in the delivery stream

Finally, I enter the name of the destination Amazon S3 bucket. Again, when I cannot remember it, I use the Browse button to let the console guide me through my list of buckets. Optionally, I enter an S3 bucket prefix for the file names. For this demo, I enter aws-news-blog. When I don’t enter a prefix name, Kinesis Data Firehose uses the date and time (in UTC) as the default value.

Under the Buffer hints, compression and encryption section, I can modify the default values for buffering, enable data compression, or select the KMS key to encrypt the data at rest on Amazon S3.

When ready, I choose Create delivery stream. After a few moments, the stream status changes to ✅  available.

Select the destination S3 bucket

Assuming there’s an application streaming data to the cluster I chose as a source, I can now navigate to my S3 bucket and see data appearing in the chosen destination format as Kinesis Data Firehose streams it.

S3 bucket browsers shows the files streamed from MSK

As you see, no code is required to read, transform, and write the records from my Kafka cluster. I also don’t have to manage the underlying infrastructure to run the streaming and transformation logic.

Pricing and Availability.
This new capability is available today in all AWS Regions where Amazon MSK and Kinesis Data Firehose are available.

You pay for the volume of data going out of Amazon MSK, measured in GB per month. The billing system takes into account the exact record size; there is no rounding. As usual, the pricing page has all the details.

I can’t wait to hear about the amount of infrastructure and code you’re going to retire after adopting this new capability. Now go and configure your first data stream between Amazon MSK and Amazon S3 today.

— seb

Perform Amazon Kinesis load testing with Locust

Post Syndicated from Luis Morales original https://aws.amazon.com/blogs/big-data/perform-amazon-kinesis-load-testing-with-locust/

Building a streaming data solution requires thorough testing at the scale it will operate in a production environment. Streaming applications operating at scale often handle large volumes of up to GBs per second, and it’s challenging for developers to simulate high-traffic Amazon Kinesis-based applications to generate such load easily.

Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose are capable of capturing and storing terabytes of data per hour from numerous sources. Creating Kinesis data streams or Firehose delivery streams is straightforward through the AWS Management Console, AWS Command Line Interface (AWS CLI), or Kinesis API. However, generating a continuous stream of test data requires a custom process or script to run continuously. Although the Amazon Kinesis Data Generator (KDG) provides a user-friendly UI for this purpose, it has some limitations, such as bandwidth constraints and increased round trip latency. (For more information on the KDG, refer to Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.)

To overcome these limitations, this post describes how to use Locust, a modern load testing framework, to conduct large-scale load testing for a more comprehensive evaluation of the streaming data solution.

Overview

This project emits temperature sensor readings via Locust to Kinesis. We set up the Amazon Elastic Compute Cloud (Amazon EC2) Locust instance via the AWS Cloud Development Kit (AWS CDK) to load test Kinesis-based applications. You can access the Locust dashboard to perform and observe the load test and connect via Session Manager, a capability of AWS Systems Manager, for configuration changes. The following diagram illustrates this architecture.

Architecture overview

In our testing with the largest recommended instance (c7g.16xlarge), the setup was capable of emitting over 1 million events per second to Kinesis data streams in on-demand capacity mode, with a batch size (simulated users per Locust user) of 500. You can find more details on what this means and how to configure the load test later in this post.

Locust overview

Locust is an open-source, scriptable, and scalable performance testing tool that allows you to define user behavior using Python code. It offers an easy-to-use interface, making it developer-friendly and highly expandable. With its distributed and scalable design, Locust can simulate millions of simultaneous users to mimic real user behavior during a performance test.

Each Locust user represents a scenario or a specific set of actions that a real user might perform on your system. When you run a performance test with Locust, you can specify the number of concurrent Locust users you want to simulate, and Locust will create an instance for each user, allowing you to assess the performance and behavior of your system under different user loads.

For more information on Locust, refer to the Locust documentation.

Prerequisites

To get started, clone or download the code from the GitHub repository.

Test locally

To test Locust out locally first before deploying it to the cloud, you have to install the necessary Python dependencies. If you’re new to Python, refer the README for more information on getting started.

Navigate to the load-test directory and run the following code:

pip install -r requirements.txt

To send events to a Kinesis data stream from your local machine, you will need to have AWS credentials. For more information, refer to Configuration and credential file settings.

To perform the test locally, stay in the load-test directory and run the following code:

locust -f locust-load-test.py

You can now access the Locust dashboard via http://0.0.0.0:8089/. Enter the number of Locust users, the spawn rate (users added per second), and the target Amazon Kinesis data stream name for Host. By default, it deploys the Kinesis data stream DemoStream that you can use for testing.

Locust Dashboard - Enter details

To see the generated events logged, run the following command, which filters only Locust and root logs (for example, no Botocore logs):

locust -f locust-load-test.py --loglevel DEBUG 2&gt;&amp;1 | grep -E "(locust|root)"

Set up resources with the AWS CDK

The GitHub repository contains the AWS CDK code to create all the necessary resources for the load test. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time. To deploy the resources, complete the following steps:

  1. If not already downloaded, clone the GitHub repository to your local computer using the following command:
git clone https://github.com/aws-samples/amazon-kinesis-load-testing-with-locust
  1. Download and install the latest Node.js.
  2. Navigate to the root folder of the project and run the following command to install the latest version of AWS CDK:
npm install -g aws-cdk
  1. Install the necessary dependencies:
npm install
  1. Run cdk bootstrap to initialize the AWS CDK environment in your AWS account. Replace your AWS account ID and Region before running the following command:
cdk bootstrap

To learn more about the bootstrapping process, refer to Bootstrapping.

  1. After the dependencies are installed, you can run the following command to deploy the stack of the AWS CDK template, which sets up the infrastructure within 5 minutes:
cdk deploy

The template sets up the Locust EC2 test instance, which is by default a c7g.xlarge instance, which at the time of publishing costs approximately $0.145 per hour in us-east-1. To find the most accurate pricing information, see Amazon EC2 On-Demand Pricing. You can find more details on how to change your instance size according to your scale of load testing later in this post.

It’s crucial to consider that the expenses incurred during load testing are not solely attributed to EC2 instance costs, but also heavily influenced by data transfer costs.

Accessing the Locust dashboard

You can access the dashboard by using the AWS CDK output KinesisLocustLoadTestingStack.locustdashboardurl to open the dashboard, for example http://1.2.3.4:8089.

The Locust dashboard is password protected. By default, it’s set to user name locust-user and password locust-dashboard-pwd.

With the default configuration, you can achieve up to 15,000 emitted events per second. Enter the number of Locust users (times the batch size), the spawn rate (users added per second), and the target Kinesis data stream name for Host.

Locust Dashboard - Enter details

After you have started the load test, you can look at the load test on the Charts tab.

Locust Dashboard - Charts

You can also monitor the load test on the Kinesis Data Streams console by navigating to the stream that you are load testing. If you used the default settings, navigate to DemoStream. On the detail page, choose the Monitoring tab to see the ingested load.

Kinesis Data Streams - Monitoring

Adapt workloads

By default, this project generates random temperature sensor readings for every sensor with the following format:

{
    "sensorId": "bfbae19c-2f0f-41c2-952b-5d5bc6e001f1_1",
    "temperature": 147.24,
    "status": "OK",
    "timestamp": 1675686126310
}

The project comes packaged with Faker, which you can use to adapt the payload to your needs. You just have to update the generate_sensor_reading function in the locust-load-test.py file:

class SensorAPIUser(KinesisBotoUser):
    # ...

    def generate_sensor_reading(self, sensor_id, sensor_reading):
        current_temperature = round(10 + random.random() * 170, 2)

        if current_temperature > 160:
            status = "ERROR"
        elif current_temperature > 140 or random.randrange(1, 100) > 80:
            status = random.choice(["WARNING", "ERROR"])
        else:
            status = "OK"

        return {
            'sensorId': f"{sensor_id}_{sensor_reading}",
            'temperature': current_temperature,
            'status': status,
            'timestamp': round(time.time()*1000)
        }

    # ...

Change configurations

After the initial deployment of the load testing tool, you can change configuration in two ways:

  1. Connect to the EC2 instance, make any configuration and code changes, and restart the Locust process
  2. Change the configuration and load testing code locally and redeploy it via cdk deploy

The first option helps you iterate more quickly on the remote instance without a need to redeploy. The latter uses the infrastructure as code (IaC) approach and makes sure that your configuration changes can be committed to your source control system. For a fast development cycle, it’s recommended to test your load test configuration locally first, connect to your instance to apply the changes, and after successful implementation, codify it as part of your IaC repository and then redeploy.

Locust is created on the EC2 instance as a systemd service and can therefore be controlled with systemctl. If you want to change the configuration of Locust as needed without redeploying the stack, you can connect to the instance via Systems Manager, navigate to the project directory on /usr/local/load-test, change the locust.env file, and restart the service by running sudo systemctl restart locust.

Large-scale load testing

This setup is capable of emitting over 1 million events per second to Kinesis data stream, with a batch size of 500 and 64 secondaries on a c7g.16xlarge.

To achieve peak performance with Locust and Kinesis, keep the following in mind:

  • Instance size – Your performance is bound by the underlying EC2 instance, so refer to EC2 instance type for more information about scaling. To set the correct instance size, you can configure the instance size in the file kinesis-locust-load-testing.ts.
  • Number of secondaries – Locust benefits from a distributed setup. Therefore, the setup spins up a primary, which does the coordination, and multiple secondaries, which do the actual work. To fully take advantage of the cores, you should specify one secondary per core. You can configure the number in the locust.env file.
  • Batch size – The amount of Kinesis data stream events you can send per Locust user is limited due to the resource overhead of switching Locust users and threads. To overcome this, you can configure a batch size to define how much users are simulated per Locust user. These are sent as a Kinesis data stream put_records call. You can configure the number in the locust.env file.

This setup is capable of emitting over 1 million events per second to the Kinesis data stream, with a batch size of 500 and 64 secondaries on a c7g.16xlarge instance.

Locust Dashboard - Large Scale Load Test Charts

You can observe this on the Monitoring tab for the Kinesis data stream as well.

Kinesis Data Stream - Large Scale Load Test Monitoring

Clean up

In order to not incur any unnecessary costs, delete the stack by running the following code:

cdk destroy

Summary

Kinesis is already popular for its ease of use among users building streaming applications. With this load testing capability using Locust, you can now test your workloads in a more straightforward and faster way. Visit the GitHub repo to embark on your testing journey.

The project is licensed under the Apache 2.0 license, providing the freedom to clone and modify it according to your needs. Furthermore, you can contribute to the project by submitting issues or pull requests via GitHub, fostering collaboration and improvement in the testing ecosystem.


About the author

Luis Morales works as Senior Solutions Architect with digital native businesses to support them in constantly reinventing themselves in the cloud. He is passionate about software engineering, cloud-native distributed systems, test-driven development, and all things code and security

Migrate from Amazon Kinesis Data Analytics for SQL Applications to Amazon Kinesis Data Analytics Studio

Post Syndicated from Nicholas Tunney original https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-kinesis-data-analytics-studio/

Amazon Kinesis Data Analytics makes it easy to transform and analyze streaming data in real time.

In this post, we discuss why AWS recommends moving from Kinesis Data Analytics for SQL Applications to Amazon Kinesis Data Analytics for Apache Flink to take advantage of Apache Flink’s advanced streaming capabilities. We also show how to use Kinesis Data Analytics Studio to test and tune your analysis before deploying your migrated applications. If you don’t have any Kinesis Data Analytics for SQL applications, this post still provides a background on many of the use cases you’ll see in your data analytics career and how Amazon Data Analytics services can help you achieve your objectives.

Kinesis Data Analytics for Apache Flink is a fully managed Apache Flink service. You only need to upload your application JAR or executable, and AWS will manage the infrastructure and Flink job orchestration. To make things simpler, Kinesis Data Analytics Studio is a notebook environment that uses Apache Flink and allows you to query data streams and develop SQL queries or proof of concept workloads before scaling your application to production in minutes.

We recommend that you use Kinesis Data Analytics for Apache Flink or Kinesis Data Analytics Studio over Kinesis Data Analytics for SQL. This is because Kinesis Data Analytics for Apache Flink and Kinesis Data Analytics Studio offer advanced data stream processing features, including exactly-once processing semantics, event time windows, extensibility using user-defined functions (UDFs) and custom integrations, imperative language support, durable application state, horizontal scaling, support for multiple data sources, and more. These are critical for ensuring accuracy, completeness, consistency, and reliability of data stream processing and are not available with Kinesis Data Analytics for SQL.

Solution overview

For our use case, we use several AWS services to stream, ingest, transform, and analyze sample automotive sensor data in real time using Kinesis Data Analytics Studio. Kinesis Data Analytics Studio allows us to create a notebook, which is a web-based development environment. With notebooks, you get a simple interactive development experience combined with the advanced capabilities provided by Apache Flink. Kinesis Data Analytics Studio uses Apache Zeppelin as the notebook, and uses Apache Flink as the stream processing engine. Kinesis Data Analytics Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. Notebooks are provisioned quickly and provide a way for you to instantly view and analyze your streaming data. Apache Zeppelin provides your Studio notebooks with a complete suite of analytics tools, including the following:

  • Data visualization
  • Exporting data to files
  • Controlling the output format for easier analysis
  • Ability to turn the notebook into a scalable, production application

Unlike Kinesis Data Analytics for SQL Applications, Kinesis Data Analytics for Apache Flink adds the following SQL support:

  • Joining stream data between multiple Kinesis data streams, or between a Kinesis data stream and an Amazon Managed Streaming for Apache Kafka (Amazon MSK) topic
  • Real-time visualization of transformed data in a data stream
  • Using Python scripts or Scala programs within the same application
  • Changing offsets of the streaming layer

Another benefit of Kinesis Data Analytics for Apache Flink is the improved scalability of the solution once deployed, because you can scale the underlying resources to meet demand. In Kinesis Data Analytics for SQL Applications, scaling is performed by adding more pumps to persuade the application into adding more resources.

In our solution, we create a notebook to access automotive sensor data, enrich the data, and send the enriched output from the Kinesis Data Analytics Studio notebook to an Amazon Kinesis Data Firehose delivery stream for delivery to an Amazon Simple Storage Service (Amazon S3) data lake. This pipeline could further be used to send data to Amazon OpenSearch Service or other targets for additional processing and visualization.

Kinesis Data Analytics for SQL Applications vs. Kinesis Data Analytics for Apache Flink

In our example, we perform the following actions on the streaming data:

  1. Connect to an Amazon Kinesis Data Streams data stream.
  2. View the stream data.
  3. Transform and enrich the data.
  4. Manipulate the data with Python.
  5. Restream the data to a Firehose delivery stream.

To compare Kinesis Data Analytics for SQL Applications with Kinesis Data Analytics for Apache Flink, let’s first discuss how Kinesis Data Analytics for SQL Applications works.

At the root of a Kinesis Data Analytics for SQL application is the concept of an in-application stream. You can think of the in-application stream as a table that holds the streaming data so you can perform actions on it. The in-application stream is mapped to a streaming source such as a Kinesis data stream. To get data into the in-application stream, first set up a source in the management console for your Kinesis Data Analytics for SQL application. Then, create a pump that reads data from the source stream and places it into the table. The pump query runs continuously and feeds the source data into the in-application stream. You can create multiple pumps from multiple sources to feed the in-application stream. Queries are then run on the in-application stream, and results can be interpreted or sent to other destinations for further processing or storage.

The following SQL demonstrates setting up an in-application stream and pump:

CREATE OR REPLACE STREAM "TEMPSTREAM" ( 
   "column1" BIGINT NOT NULL, 
   "column2" INTEGER, 
   "column3" VARCHAR(64));

CREATE OR REPLACE PUMP "SAMPLEPUMP" AS 
INSERT INTO "TEMPSTREAM" ("column1", 
                          "column2", 
                          "column3") 
SELECT STREAM inputcolumn1, 
      inputcolumn2, 
      inputcolumn3
FROM "INPUTSTREAM";

Data can be read from the in-application stream using a SQL SELECT query:

SELECT *
FROM "TEMPSTREAM"

When creating the same setup in Kinesis Data Analytics Studio, you use the underlying Apache Flink environment to connect to the streaming source, and create the data stream in one statement using a connector. The following example shows connecting to the same source we used before, but using Apache Flink:

CREATE TABLE `MY_TABLE` ( 
   "column1" BIGINT NOT NULL, 
   "column2" INTEGER, 
   "column3" VARCHAR(64)
) WITH (
   'connector' = 'kinesis',
   'stream' = sample-kinesis-stream',
   'aws.region' = 'aws-kinesis-region',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'
 );

MY_TABLE is now a data stream that will continually receive the data from our sample Kinesis data stream. It can be queried using a SQL SELECT statement:

SELECT column1, 
       column2, 
       column3
FROM MY_TABLE;

Although Kinesis Data Analytics for SQL Applications use a subset of the SQL:2008 standard with extensions to enable operations on streaming data, Apache Flink’s SQL support is based on Apache Calcite, which implements the SQL standard.

It’s also important to mention that Kinesis Data Analytics Studio supports PyFlink and Scala alongside SQL within the same notebook. This allows you to perform complex, programmatic methods on your streaming data that aren’t possible with SQL.

Prerequisites

During this exercise, we set up various AWS resources and perform analytics queries. To follow along, you need an AWS account with administrator access. If you don’t already have an AWS account with administrator access, create one now. The services outlined in this post may incur charges to your AWS account. Make sure to follow the cleanup instructions at the end of this post.

Configure streaming data

In the streaming domain, we’re often tasked with exploring, transforming, and enriching data coming from Internet of Things (IoT) sensors. To generate the real-time sensor data, we employ the AWS IoT Device Simulator. This simulator runs within your AWS account and provides a web interface that lets users launch fleets of virtually connected devices from a user-defined template and then simulate them to publish data at regular intervals to AWS IoT Core. This means we can build a virtual fleet of devices to generate sample data for this exercise.

We deploy the IoT Device Simulator using the following Amazon CloudFront template. It handles creating all the necessary resources in your account.

  1. On the Specify stack details page, assign a name to your solution stack.
  2. Under Parameters, review the parameters for this solution template and modify them as necessary.
  3. For User email, enter a valid email to receive a link and password to log in to the IoT Device Simulator UI.
  4. Choose Next.
  5. On the Configure stack options page, choose Next.
  6. On the Review page, review and confirm the settings. Select the check boxes acknowledging that the template creates AWS Identity and Access Management (IAM) resources.
  7. Choose Create stack.

The stack takes about 10 minutes to install.

  1. When you receive your invitation email, choose the CloudFront link and log in to the IoT Device Simulator using the credentials provided in the email.

The solution contains a prebuilt automotive demo that we can use to begin delivering sensor data quickly to AWS.

  1. On the Device Type page, choose Create Device Type.
  2. Choose Automotive Demo.
  3. The payload is auto populated. Enter a name for your device, and enter automotive-topic as the topic.
  4. Choose Save.

Now we create a simulation.

  1. On the Simulations page, choose Create Simulation.
  2. For Simulation type, choose Automotive Demo.
  3. For Select a device type, choose the demo device you created.
  4. For Data transmission interval and Data transmission duration, enter your desired values.

You can enter any values you like, but use at least 10 devices transmitting every 10 seconds. You’ll want to set your data transmission duration to a few minutes, or you’ll need to restart your simulation several times during the lab.

  1. Choose Save.

Now we can run the simulation.

  1. On the Simulations page, select the desired simulation, and choose Start simulations.

Alternatively, choose View next to the simulation you want to run, then choose Start to run the simulation.

  1. To view the simulation, choose View next to the simulation you want to view.

If the simulation is running, you can view a map with the locations of the devices, and up to 100 of the most recent messages sent to the IoT topic.

We can now check to ensure our simulator is sending the sensor data to AWS IoT Core.

  1. Navigate to the AWS IoT Core console.

Make sure you’re in the same Region you deployed your IoT Device Simulator.

  1. In the navigation pane, choose MQTT Test Client.
  2. Enter the topic filter automotive-topic and choose Subscribe.

As long as you have your simulation running, the messages being sent to the IoT topic will be displayed.

Finally, we can set a rule to route the IoT messages to a Kinesis data stream. This stream will provide our source data for the Kinesis Data Analytics Studio notebook.

  1. On the AWS IoT Core console, choose Message Routing and Rules.
  2. Enter a name for the rule, such as automotive_route_kinesis, then choose Next.
  3. Provide the following SQL statement. This SQL will select all message columns from the automotive-topic the IoT Device Simulator is publishing:
SELECT timestamp, trip_id, VIN, brake, steeringWheelAngle, torqueAtTransmission, engineSpeed, vehicleSpeed, acceleration, parkingBrakeStatus, brakePedalStatus, transmissionGearPosition, gearLeverPosition, odometer, ignitionStatus, fuelLevel, fuelConsumedSinceRestart, oilTemp, location 
FROM 'automotive-topic' WHERE 1=1
  1. Choose Next.
  2. Under Rule Actions, select Kinesis Stream as the source.
  3. Choose Create New Kinesis Stream.

This opens a new window.

  1. For Data stream name, enter automotive-data.

We use a provisioned stream for this exercise.

  1. Choose Create Data Stream.

You may now close this window and return to the AWS IoT Core console.

  1. Choose the refresh button next to Stream name, and choose the automotive-data stream.
  2. Choose Create new role and name the role automotive-role.
  3. Choose Next.
  4. Review the rule properties, and choose Create.

The rule begins routing data immediately.

Set up Kinesis Data Analytics Studio

Now that we have our data streaming through AWS IoT Core and into a Kinesis data stream, we can create our Kinesis Data Analytics Studio notebook.

  1. On the Amazon Kinesis console, choose Analytics applications in the navigation pane.
  2. On the Studio tab, choose Create Studio notebook.
  3. Leave Quick create with sample code selected.
  4. Name the notebook automotive-data-notebook.
  5. Choose Create to create a new AWS Glue database in a new window.
  6. Choose Add database.
  7. Name the database automotive-notebook-glue.
  8. Choose Create.
  9. Return to the Create Studio notebook section.
  10. Choose refresh and choose your new AWS Glue database.
  11. Choose Create Studio notebook.
  12. To start the Studio notebook, choose Run and confirm.
  13. Once the notebook is running, choose the notebook and choose Open in Apache Zeppelin.
  14. Choose Import note.
  15. Choose Add from URL.
  16. Enter the following URL: https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-2461/auto-notebook.ipynb.
  17. Choose Import Note.
  18. Open the new note.

Perform stream analysis

In a Kinesis Data Analytics for SQL application, we add our streaming course via the management console, and then define an in-application stream and pump to stream data from our Kinesis data stream. The in-application stream functions as a table to hold the data and make it available for us to query. The pump takes the data from our source and streams it to our in-application stream. Queries may then be run against the in-application stream using SQL, just as we’d query any SQL table. See the following code:

CREATE OR REPLACE STREAM "AUTOSTREAM" ( 
    `trip_id` CHAR(36),
    `VIN` CHAR(17),
    `brake` FLOAT,
    `steeringWheelAngle` FLOAT,
    `torqueAtTransmission` FLOAT,
    `engineSpeed` FLOAT,
    `vehicleSpeed` FLOAT,
    `acceleration` FLOAT,
    `parkingBrakeStatus` BOOLEAN,
    `brakePedalStatus` BOOLEAN,
    `transmissionGearPosition` VARCHAR(10),
    `gearLeverPosition` VARCHAR(10),
    `odometer` FLOAT,
    `ignitionStatus` VARCHAR(4),
    `fuelLevel` FLOAT,
    `fuelConsumedSinceRestart` FLOAT,
    `oilTemp` FLOAT,
    `location` VARCHAR(100),
    `timestamp` TIMESTAMP(3));

CREATE OR REPLACE PUMP "MYPUMP" AS 
INSERT INTO "AUTOSTREAM" ("trip_id",
    "VIN",
    "brake",
    "steeringWheelAngle",
    "torqueAtTransmission",
    "engineSpeed",
    "vehicleSpeed",
    "acceleration",
    "parkingBrakeStatus",
    "brakePedalStatus",
    "transmissionGearPosition",
    "gearLeverPosition",
    "odometer",
    "ignitionStatus",
    "fuelLevel",
    "fuelConsumedSinceRestart",
    "oilTemp",
    "location",
    "timestamp")
SELECT VIN,
    brake,
    steeringWheelAngle,
    torqueAtTransmission,
    engineSpeed,
    vehicleSpeed,
    acceleration,
    parkingBrakeStatus,
    brakePedalStatus,
    transmissionGearPosition,
    gearLeverPosition,
    odometer,
    ignitionStatus,
    fuelLevel,
    fuelConsumedSinceRestart,
    oilTemp,
    location,
    timestamp
FROM "INPUT_STREAM"

To migrate an in-application stream and pump from our Kinesis Data Analytics for SQL application to Kinesis Data Analytics Studio, we convert this into a single CREATE statement by removing the pump definition and defining a kinesis connector. The first paragraph in the Zeppelin notebook sets up a connector that is presented as a table. We can define columns for all items in the incoming message, or a subset.

Run the statement, and a success result is output in your notebook. We can now query this table using SQL, or we can perform programmatic operations with this data using PyFlink or Scala.

Before performing real-time analytics on the streaming data, let’s look at how the data is currently formatted. To do this, we run a simple Flink SQL query on the table we just created. The SQL used in our streaming application is identical to what is used in a SQL application.

Note that if you don’t see records after a few seconds, make sure that your IoT Device Simulator is still running.

If you’re also running the Kinesis Data Analytics for SQL code, you may see a slightly different result set. This is another key differentiator in Kinesis Data Analytics for Apache Flink, because the latter has the concept of exactly once delivery. If this application is deployed to production and is restarted or if scaling actions occur, Kinesis Data Analytics for Apache Flink ensures you only receive each message once, whereas in a Kinesis Data Analytics for SQL application, you need to further process the incoming stream to ensure you ignore repeat messages that could affect your results.

You can stop the current paragraph by choosing the pause icon. You may see an error displayed in your notebook when you stop the query, but it can be ignored. It’s just letting you know that the process was canceled.

Flink SQL implements the SQL standard, and provides an easy way to perform calculations on the stream data just like you would when querying a database table. A common task while enriching data is to create a new field to store a calculation or conversion (such as from Fahrenheit to Celsius), or create new data to provide simpler queries or improved visualizations downstream. Run the next paragraph to see how we can add a Boolean value named accelerating, which we can easily use in our sink to know if an automobile was currently accelerating at the time the sensor was read. The process here doesn’t differ between Kinesis Data Analytics for SQL and Kinesis Data Analytics for Apache Flink.

You can stop the paragraph from running when you have inspected the new column, comparing our new Boolean value to the FLOAT acceleration column.

Data being sent from a sensor is usually compact to improve latency and performance. Being able to enrich the data stream with external data and enrich the stream, such as additional vehicle information or current weather data, can be very useful. In this example, let’s assume we want to bring in data currently stored in a CSV in Amazon S3, and add a column named color that reflects the current engine speed band.

Apache Flink SQL provides several source connectors for AWS services and other sources. Creating a new table like we did in our first paragraph but instead using the filesystem connector permits Flink to directly connect to Amazon S3 and read our source data. Previously in Kinesis Data Analytics for SQL Applications, you couldn’t add new references inline. Instead, you defined S3 reference data and added it to your application configuration, which you could then use as a reference in a SQL JOIN.

NOTE: If you are not using the us-east-1 region, you can download the csv and place the object your own S3 bucket.  Reference the csv file as s3a://<bucket-name>/<key-name>

Building on the last query, the next paragraph performs a SQL JOIN on our current data and the new lookup source table we created.

Now that we have an enriched data stream, we restream this data. In a real-world scenario, we have many choices on what to do with our data, such as sending the data to an S3 data lake, another Kinesis data stream for further analysis, or storing the data in OpenSearch Service for visualization. For simplicity, we send the data to Kinesis Data Firehose, which streams the data into an S3 bucket acting as our data lake.

Kinesis Data Firehose can stream data to Amazon S3, OpenSearch Service, Amazon Redshift data warehouses, and Splunk in just a few clicks.

Create the Kinesis Data Firehose delivery stream

To create our delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. Choose Direct PUT for the stream source and Amazon S3 as the target.
  3. Name your delivery stream automotive-firehose.
  4. Under Destination settings, create a new bucket or use an existing bucket.
  5. Take note of the S3 bucket URL.
  6. Choose Create delivery stream.

The stream takes a few seconds to create.

  1. Return to the Kinesis Data Analytics console and choose Streaming applications.
  2. On the Studio tab, and choose your Studio notebook.
  3. Choose the link under IAM role.
  4. In the IAM window, choose Add permissions and Attach policies.
  5. Search for and select AmazonKinesisFullAccess and CloudWatchFullAccess, then choose Attach policy.
  6. You may return to your Zeppelin notebook.

Stream data into Kinesis Data Firehose

As of Apache Flink v1.15, creating the connector to the Firehose delivery stream works similar to creating a connector to any Kinesis data stream. Note that there are two differences: the connector is firehose, and the stream attribute becomes delivery-stream.

After the connector is created, we can write to the connector like any SQL table.

To validate that we’re getting data through the delivery stream, open the Amazon S3 console and confirm you see files being created. Open the file to inspect the new data.

In Kinesis Data Analytics for SQL Applications, we would have created a new destination in the SQL application dashboard. To migrate an existing destination, you add a SQL statement to your notebook that defines the new destination right in the code. You can continue to write to the new destination as you would have with an INSERT while referencing the new table name.

Time data

Another common operation you can perform in Kinesis Data Analytics Studio notebooks is aggregation over a window of time. This sort of data can be used to send to another Kinesis data stream to identify anomalies, send alerts, or be stored for further processing. The next paragraph contains a SQL query that uses a tumbling window and aggregates total fuel consumed for the automotive fleet for 30-second periods. Like our last example, we could connect to another data stream and insert this data for further analysis.

Scala and PyFlink

There are times when a function you’d perform on your data stream is better written in a programming language instead of SQL, for both simplicity and maintenance. Some examples include complex calculations that SQL functions don’t support natively, certain string manipulations, the splitting of data into multiple streams, and interacting with other AWS services (such as text translation or sentiment analysis). Kinesis Data Analytics for Apache Flink has the ability to use multiple Flink interpreters within the Zeppelin notebook, which is not available in Kinesis Data Analytics for SQL Applications.

If you have been paying close attention to our data, you’ll see that the location field is a JSON string. In Kinesis Data Analytics for SQL, we could use string functions and define a SQL function and break apart the JSON string. This is a fragile approach depending on the stability of the message data, but this could be improved with several SQL functions. The syntax for creating a function in Kinesis Data Analytics for SQL follows this pattern:

CREATE FUNCTION ''<function_name>'' ( ''<parameter_list>'' )
    RETURNS ''<data type>''
    LANGUAGE SQL
    [ SPECIFIC ''<specific_function_name>''  | [NOT] DETERMINISTIC ]
    CONTAINS SQL
    [ READS SQL DATA ]
    [ MODIFIES SQL DATA ]
    [ RETURNS NULL ON NULL INPUT | CALLED ON NULL INPUT ]  
  RETURN ''<SQL-defined function body>''

In Kinesis Data Analytics for Apache Flink, AWS recently upgraded the Apache Flink environment to v1.15, which extends Apache Flink SQL’s table SQL to add JSON functions that are similar to JSON Path syntax. This allows us to query the JSON string directly in our SQL. See the following code:

%flink.ssql(type=update)
SELECT JSON_STRING(location, ‘$.latitude) AS latitude,
JSON_STRING(location, ‘$.longitude) AS longitude
FROM my_table

Alternatively, and required prior to Apache Flink v1.15, we can use Scala or PyFlink in our notebook to convert the field and restream the data. Both languages provide robust JSON string handling.

The following PyFlink code defines two user-defined functions, which extract the latitude and longitude from the location field of our message. These UDFs can then be invoked from using Flink SQL. We reference the environment variable st_env. PyFlink creates six variables for you in your Zeppelin notebook. Zeppelin also exposes a context for you as the variable z.

Errors can also happen when messages contain unexpected data. Kinesis Data Analytics for SQL Applications provides an in-application error stream. These errors can then be processed separately and restreamed or dropped. With PyFlink in Kinesis Data Analytics Streaming applications, you can write complex error-handling strategies and immediately recover and continue processing the data. When the JSON string is passed into the UDF, it may be malformed, incomplete, or empty. By catching the error in the UDF, Python will always return a value even if an error would have occurred.

The following sample code shows another PyFlink snippet that performs a division calculation on two fields. If a division-by-zero error is encountered, it provides a default value so the stream can continue processing the message.

%flink.pyflink
@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def DivideByZero(price):    
	try:        
		price / 0        
	except:        
		return -1
st_env.register_function("DivideByZero", DivideByZero)

Next steps

Building a pipeline as we’ve done in this post gives us the base for testing additional services in AWS. I encourage you to continue your streaming analytics learning before tearing down the streams you created. Consider the following:

Clean up

To clean up the services created in this exercise, complete the following steps:

  1. Navigate to the CloudFormation Console and delete the IoT Device Simulator stack.
  2. On the AWS IoT Core console, choose Message Routing and Rules, and delete the rule automotive_route_kinesis.
  3. Delete the Kinesis data stream automotive-data in the Kinesis Data Stream console.
  4. Remove the IAM role automotive-role in the IAM Console.
  5. In the AWS Glue console, delete the automotive-notebook-glue database.
  6. Delete the Kinesis Data Analytics Studio notebook automotive-data-notebook.
  7. Delete the Firehose delivery stream automotive-firehose.

Conclusion

Thanks for following along with this tutorial on Kinesis Data Analytics Studio. If you’re currently using a legacy Kinesis Data Analytics Studio SQL application, I recommend you reach out to your AWS technical account manager or Solutions Architect and discuss migrating to Kinesis Data Analytics Studio. You can continue your learning path in our Amazon Kinesis Data Streams Developer Guide, and access our code samples on GitHub.


About the Author

Nicholas Tunney is a Partner Solutions Architect for Worldwide Public Sector at AWS. He works with global SI partners to develop architectures on AWS for clients in the government, nonprofit healthcare, utility, and education sectors.

Stream VPC Flow Logs to Datadog via Amazon Kinesis Data Firehose

Post Syndicated from Chaitanya Shah original https://aws.amazon.com/blogs/big-data/stream-vpc-flow-logs-to-datadog-via-amazon-kinesis-data-firehose/

It’s common to store the logs generated by customer’s applications and services in various tools. These logs are important for compliance, audits, troubleshooting, security incident responses, meeting security policies, and many other purposes. You can perform log analysis on these logs to understand users’ application behavior and patterns to make informed decisions.

When running workloads on Amazon Web Services (AWS), you need to analyze Amazon Virtual Private Cloud (Amazon VPC) Flow Logs to track the IP traffic going to and from the network interfaces for the workloads in their VPC. Analyzing VPC flow logs helps you understand how your applications are communicating over the VPC network and acts as a main source of information to the network in your VPC.

You can easily deliver data to supported destinations using the Amazon Kinesis Data Firehose integration with VPC flow logs. Kinesis Data Firehose is a fully managed service for delivering near-real-time streaming data to various destinations for storage and performing near-real-time analytics. With its extensible data transformation capabilities, you can also streamline log processing and log delivery pipelines into a single Kinesis Data Firehose delivery stream. You can perform analytics on VPC flow logs delivered from your VPC using the Kinesis Data Firehose integration with Datadog as a destination.

Datadog is a monitoring and security platform and AWS Partner Network (APN) Advanced Technology Partner with AWS Competencies in AWS Cloud Operations, DevOps, Migration, Security, Networking, Containers, and Microsoft Workloads, along with many others.

Datadog enables you to easily explore and analyze logs to gain deeper insights into the state of your applications and AWS infrastructure. You can analyze all your AWS service logs while storing only the ones you need, generate metrics from aggregated logs to uncover, and send alerts about trends in your AWS services.

In this post, you learn how to integrate VPC flow logs with Kinesis Data Firehose and deliver it to Datadog.

Solution overview

This solution uses native integration of VPC flow logs streaming to Kinesis Data Firehose. We use a Kinesis Data Firehose delivery stream to buffer the streamed VPC flow logs to a Datadog destination endpoint in your Datadog account. You can use these logs with Datadog Log Management and Datadog Cloud SIEM to analyze the health, performance, and security of your cloud resources.

The following diagram illustrates the solution architecture.

We walk you through the following high-level steps:

  1. Link your AWS account with your Datadog account.
  2. Create the Kinesis Data Firehose stream where VPC service streams the flow logs.
  3. Create the VPC flow log subscription to Kinesis Data Firehose.
  4. Visualize VPC flow logs in the Datadog dashboard.

The account ID 123456781234 used in this post is a dummy account. It is used only for demonstration purposes.

Prerequisites

You should have the following prerequisites:

Link your AWS account with your Datadog account for AWS integration

Follow the instructions provided on the Datadog website for AWS Integration. To configure log archiving and enrich the log data sent from your AWS account with useful context, link the accounts. When you complete the linking setup, proceed to the following step.

Create a Kinesis Data Firehose stream

Now that your Datadog integration with AWS is complete, you can create a Kinesis Data Firehose delivery stream where VPC Flow Logs are streamed by following these steps:

  1. On the Amazon Kinesis console, choose Kinesis Data Firehose in the navigation pane.
  2. Choose Create delivery stream.
  3. Choose Direct PUT as the source.
  4. Set Destination as Datadog.
    Create delivery stream
  1. For Delivery stream name, enter PUT-DATADOG-DEMO.
  2. Keep Data transformation set to Disabled under Transform records.
  3. In Destination settings, for HTTP endpoint URL, choose the desired log’s HTTP endpoint based on your Region and Datadog account configuration.
    Kinesis delivery stream configuration
  4. For API key, enter your Datadog API key.

This allows your delivery stream to publish VPC Flow logs to the Datadog endpoint. API keys are unique to your organization. An API key is required by the Datadog Agent to submit metrics and events to Datadog.

  1. Set Content encoding to GZIP to reduce the size of data transferred.
  2. Set the Retry duration to 60.You can change the Retry duration value if you need to. This depends on the request handling capacity of the Datadog endpoint.
    Kinesis destination settings
    Under Buffer hints, Buffer size and Buffer interval are set with default values for Datadog integration.
    Kinesis buffer settings
  1. Under Backup settings, as mentioned in the prerequisites, choose the S3 bucket that you created to store failed logs and backup with specific prefix.
  2. Under S3 buffer hints section, set Buffer size to 5 and Buffer interval to 300.

You can change the S3 buffer size and interval based on your requirements.

  1. Under S3 compression and encryption, select GZIP for Compression for data records or another compression method of your choice.

Compressing data reduces the required storage space.

  1. Select Disabled for Encryption of the data records. You can enable encryption of the data records to secure access to your logs.
    Kinesis stream backup settings
  1. Optionally, in Advanced settings, select Enable server-side encryption for source records in delivery stream.
    You can use AWS managed keys or a CMK managed by you for the encryption type.
  1. Enable CloudWatch error logging.
  2. Choose Create or update IAM role, which is created by Kinesis Data Firehose as part of this stream.
    Kinesis stream Advanced settings
  1. Choose Next.
  2. Review your settings.
  3. Choose Create delivery stream.

Create a VPC flow logs subscription

Create a VPC flow logs subscription for the Kinesis Data Firehose delivery stream you created in the previous step:

  1. On the Amazon VPC console, choose Your VPCs.
  2. Select the VPC that you to create the flow log for.
  3. On the Actions menu, choose Create flow log.
    AWS VPCs
  1. Select All to send all flow log records to the Firehose destination.

If you want to filter the flow logs, you could alternatively select Accept or Reject.

  1. For Maximum aggregation interval, select 10 minutes or the minimum setting of 1 minute if you need the flow log data to be available for near-real-time analysis in Datadog.
  2. For Destination, select Send to Kinesis Data Firehose in the same account if the delivery stream is set up on the same account where you create the VPC flow logs.

If you want to send the data to a different account, refer to Publish flow logs to Kinesis Data Firehose.

  1. Choose an option for Log record format:
  2. If you leave Log record format as the AWS default format, the flow logs are sent as version 2 format.
  3. Alternatively, you can specify the custom fields for flow logs to capture and send it to Datadog.

For more information on log format and available fields, refer to Flow log records.

  1. Choose Create flow log.
    Create VPC Flow log

Now let’s explore the VPC flow logs in Datadog.

Visualize VPC flow logs in the Datadog dashboard

In the Logs Search option in the navigation pane, filter to source:vpc. The VPC flow logs from your VPC are in the Datadog Log Explorer and are automatically parsed so you can analyze your logs by source, destination, action, or other attributes.

Datadog Logs Dashboard

Clean up

After you test this solution, delete all the resources you created to avoid incurring future charges. Refer to the following links for instructions for deleting the resources:

Conclusion

In this post, we walked through a solution of how to integrate VPC flow logs with a Kinesis Data Firehose delivery stream, deliver it to a Datadog destination with no code, and visualize it in a Datadog dashboard. With Datadog, you can easily explore and analyze logs to gain deeper insights into the state of your applications and AWS infrastructure.

Try this new, quick, and hassle-free way of sending your VPC flow logs to a Datadog destination using Kinesis Data Firehose.


About the Author

Chaitanya Shah - AWS Chaitanya Shah is a Sr. Technical Account Manager(TAM) with AWS, based out of New York. He has over 22 years of experience working with enterprise customers. He loves to code and actively contributes to the AWS solutions labs to help customers solve complex problems. He provides guidance to AWS customers on best practices for their AWS Cloud migrations. He is also specialized in AWS data transfer and the data and analytics domain.

Serverless ICYMI Q1 2023

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/serverless-icymi-q1-2023/

Welcome to the 21st edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all the most recent product launches, feature enhancements, blog posts, webinars, live streams, and other interesting things that you might have missed!

ICYMI2023Q1

In case you missed our last ICYMI, check out what happened last quarter here.

Artificial intelligence (AI) technologies, ChatGPT, and DALL-E are creating significant interest in the industry at the moment. Find out how to integrate serverless services with ChatGPT and DALL-E to generate unique bedtime stories for children.

Example notification of a story hosted with Next.js and App Runner

Example notification of a story hosted with Next.js and App Runner

Serverless Land is a website maintained by the Serverless Developer Advocate team to help you build serverless applications and includes workshops, code examples, blogs, and videos. There is now enhanced search functionality so you can search across resources, patterns, and video content.

SLand-search

ServerlessLand search

AWS Lambda

AWS Lambda has improved how concurrency works with Amazon SQS. You can now control the maximum number of concurrent Lambda functions invoked.

The launch blog post explains the scaling behavior of Lambda using this architectural pattern, challenges this feature helps address, and a demo of maximum concurrency in action.

Maximum concurrency is set to 10 for the SQS queue.

Maximum concurrency is set to 10 for the SQS queue.

AWS Lambda Powertools is an open-source library to help you discover and incorporate serverless best practices more easily. Lambda Powertools for .NET is now generally available and currently focused on three observability features: distributed tracing (Tracer), structured logging (Logger), and asynchronous business and application metrics (Metrics). Powertools is also available for Python, Java, and Typescript/Node.js programming languages.

To learn more:

Lambda announced a new feature, runtime management controls, which provide more visibility and control over when Lambda applies runtime updates to your functions. The runtime controls are optional capabilities for advanced customers that require more control over their runtime changes. You can now specify a runtime management configuration for each function with three settings, Automatic (default), Function update, or manual.

There are three new Amazon CloudWatch metrics for asynchronous Lambda function invocations: AsyncEventsReceived, AsyncEventAge, and AsyncEventsDropped. You can track the asynchronous invocation requests sent to Lambda functions to monitor any delays in processing and take corrective actions if required. The launch blog post explains the new metrics and how to use them to troubleshoot issues.

Lambda now supports Amazon DocumentDB change streams as an event source. You can use Lambda functions to process new documents, track updates to existing documents, or log deleted documents. You can use any programming language that is supported by Lambda to write your functions.

There is a helpful blog post suggesting best practices for developing portable Lambda functions that allow you to port your code to containers if you later choose to.

AWS Step Functions

AWS Step Functions has expanded its AWS SDK integrations with support for 35 additional AWS services including Amazon EMR Serverless, AWS Clean Rooms, AWS IoT FleetWise, AWS IoT RoboRunner and 31 other AWS services. In addition, Step Functions also added support for 1000+ new API actions from new and existing AWS services such as Amazon DynamoDB and Amazon Athena. For the full list of added services, visit AWS SDK service integrations.

Amazon EventBridge

Amazon EventBridge has launched the AWS Controllers for Kubernetes (ACK) for EventBridge and Pipes . This allows you to manage EventBridge resources, such as event buses, rules, and pipes, using the Kubernetes API and resource model (custom resource definitions).

EventBridge event buses now also support enhanced integration with Service Quotas. Your quota increase requests for limits such as PutEvents transactions-per-second, number of rules, and invocations per second among others will be processed within one business day or faster, enabling you to respond quickly to changes in usage.

AWS SAM

The AWS Serverless Application Model (SAM) Command Line Interface (CLI) has added the sam list command. You can now show resources defined in your application, including the endpoints, methods, and stack outputs required to test your deployed application.

AWS SAM has a preview of sam build support for building and packaging serverless applications developed in Rust. You can use cargo-lambda in the AWS SAM CLI build workflow and AWS SAM Accelerate to iterate on your code changes rapidly in the cloud.

You can now use AWS SAM connectors as a source resource parameter. Previously, you could only define AWS SAM connectors as a AWS::Serverless::Connector resource. Now you can add the resource attribute on a connector’s source resource, which makes templates more readable and easier to update over time.

AWS SAM connectors now also support multiple destinations to simplify your permissions. You can now use a single connector between a single source resource and multiple destination resources.

In October 2022, AWS released OpenID Connect (OIDC) support for AWS SAM Pipelines. This improves your security posture by creating integrations that use short-lived credentials from your CI/CD provider. There is a new blog post on how to implement it.

Find out how best to build serverless Java applications with the AWS SAM CLI.

AWS App Runner

AWS App Runner now supports retrieving secrets and configuration data stored in AWS Secrets Manager and AWS Systems Manager (SSM) Parameter Store in an App Runner service as runtime environment variables.

AppRunner also now supports incoming requests based on HTTP 1.0 protocol, and has added service level concurrency, CPU and Memory utilization metrics.

Amazon S3

Amazon S3 now automatically applies default encryption to all new objects added to S3, at no additional cost and with no impact on performance.

You can now use an S3 Object Lambda Access Point alias as an origin for your Amazon CloudFront distribution to tailor or customize data to end users. For example, you can resize an image depending on the device that an end user is visiting from.

S3 has introduced Mountpoint for S3, a high performance open source file client that translates local file system API calls to S3 object API calls like GET and LIST.

S3 Multi-Region Access Points now support datasets that are replicated across multiple AWS accounts. They provide a single global endpoint for your multi-region applications, and dynamically route S3 requests based on policies that you define. This helps you to more easily implement multi-Region resilience, latency-based routing, and active-passive failover, even when data is stored in multiple accounts.

Amazon Kinesis

Amazon Kinesis Data Firehose now supports streaming data delivery to Elastic. This is an easier way to ingest streaming data to Elastic and consume the Elastic Stack (ELK Stack) solutions for enterprise search, observability, and security without having to manage applications or write code.

Amazon DynamoDB

Amazon DynamoDB now supports table deletion protection to protect your tables from accidental deletion when performing regular table management operations. You can set the deletion protection property for each table, which is set to disabled by default.

Amazon SNS

Amazon SNS now supports AWS X-Ray active tracing to visualize, analyze, and debug application performance. You can now view traces that flow through Amazon SNS topics to destination services, such as Amazon Simple Queue Service, Lambda, and Kinesis Data Firehose, in addition to traversing the application topology in Amazon CloudWatch ServiceLens.

SNS also now supports setting content-type request headers for HTTPS notifications so applications can receive their notifications in a more predictable format. Topic subscribers can create a DeliveryPolicy that specifies the content-type value that SNS assigns to their HTTPS notifications, such as application/json, application/xml, or text/plain.

EDA Visuals collection added to Serverless Land

The Serverless Developer Advocate team has extended Serverless Land and introduced EDA visuals. These are small bite sized visuals to help you understand concept and patterns about event-driven architectures. Find out about batch processing vs. event streaming, commands vs. events, message queues vs. event brokers, and point-to-point messaging. Discover bounded contexts, migrations, idempotency, claims, enrichment and more!

EDA-visuals

EDA Visuals

To learn more:

Serverless Repos Collection on Serverless Land

There is also a new section on Serverless Land containing helpful code repositories. You can search for code repos to use for examples, learning or building serverless applications. You can also filter by use-case, runtime, and level.

Serverless Repos Collection

Serverless Repos Collection

Serverless Blog Posts

January

Jan 12 – Introducing maximum concurrency of AWS Lambda functions when using Amazon SQS as an event source

Jan 20 – Processing geospatial IoT data with AWS IoT Core and the Amazon Location Service

Jan 23 – AWS Lambda: Resilience under-the-hood

Jan 24 – Introducing AWS Lambda runtime management controls

Jan 24 – Best practices for working with the Apache Velocity Template Language in Amazon API Gateway

February

Feb 6 – Previewing environments using containerized AWS Lambda functions

Feb 7 – Building ad-hoc consumers for event-driven architectures

Feb 9 – Implementing architectural patterns with Amazon EventBridge Pipes

Feb 9 – Securing CI/CD pipelines with AWS SAM Pipelines and OIDC

Feb 9 – Introducing new asynchronous invocation metrics for AWS Lambda

Feb 14 – Migrating to token-based authentication for iOS applications with Amazon SNS

Feb 15 – Implementing reactive progress tracking for AWS Step Functions

Feb 23 – Developing portable AWS Lambda functions

Feb 23 – Uploading large objects to Amazon S3 using multipart upload and transfer acceleration

Feb 28 – Introducing AWS Lambda Powertools for .NET

March

Mar 9 – Server-side rendering micro-frontends – UI composer and service discovery

Mar 9 – Building serverless Java applications with the AWS SAM CLI

Mar 10 – Managing sessions of anonymous users in WebSocket API-based applications

Mar 14 –
Implementing an event-driven serverless story generation application with ChatGPT and DALL-E

Videos

Serverless Office Hours – Tues 10AM PT

Weekly office hours live stream. In each session we talk about a specific topic or technology related to serverless and open it up to helping you with your real serverless challenges and issues. Ask us anything you want about serverless technologies and applications.

January

Jan 10 – Building .NET 7 high performance Lambda functions

Jan 17 – Amazon Managed Workflows for Apache Airflow at Scale

Jan 24 – Using Terraform with AWS SAM

Jan 31 – Preparing your serverless architectures for the big day

February

Feb 07- Visually design and build serverless applications

Feb 14 – Multi-tenant serverless SaaS

Feb 21 – Refactoring to Serverless

Feb 28 – EDA visually explained

March

Mar 07 – Lambda cookbook with Python

Mar 14 – Succeeding with serverless

Mar 21 – Lambda Powertools .NET

Mar 28 – Server-side rendering micro-frontends

FooBar Serverless YouTube channel

Marcia Villalba frequently publishes new videos on her popular serverless YouTube channel. You can view all of Marcia’s videos at https://www.youtube.com/c/FooBar_codes.

January

Jan 12 – Serverless Badge – A new certification to validate your Serverless Knowledge

Jan 19 – Step functions Distributed map – Run 10k parallel serverless executions!

Jan 26 – Step Functions Intrinsic Functions – Do simple data processing directly from the state machines!

February

Feb 02 – Unlock the Power of EventBridge Pipes: Integrate Across Platforms with Ease!

Feb 09 – Amazon EventBridge Pipes: Enrichment and filter of events Demo with AWS SAM

Feb 16 – AWS App Runner – Deploy your apps from GitHub to Cloud in Record Time

Feb 23 – AWS App Runner – Demo hosting a Node.js app in the cloud directly from GitHub (AWS CDK)

March

Mar 02 – What is Amazon DynamoDB? What are the most important concepts? What are the indexes?

Mar 09 – Choreography vs Orchestration: Which is Best for Your Distributed Application?

Mar 16 – DynamoDB Single Table Design: Simplify Your Code and Boost Performance with Table Design Strategies

Mar 23 – 8 Reasons You Should Choose DynamoDB for Your Next Project and How to Get Started

Sessions with SAM & Friends

SAMFiends

AWS SAM & Friends

Eric Johnson is exploring how developers are building serverless applications. We spend time talking about AWS SAM as well as others like AWS CDK, Terraform, Wing, and AMPT.

Feb 16 – What’s new with AWS SAM

Feb 23 – AWS SAM with AWS CDK

Mar 02 – AWS SAM and Terraform

Mar 10 – Live from ServerlessDays ANZ

Mar 16 – All about AMPT

Mar 23 – All about Wing

Mar 30 – SAM Accelerate deep dive

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on Twitter to see the latest news, follow conversations, and interact with the team.

Accelerate data insights with Elastic and Amazon Kinesis Data Firehose

Post Syndicated from Udayasimha Theepireddy original https://aws.amazon.com/blogs/big-data/accelerate-data-insights-with-elastic-and-amazon-kinesis-data-firehose/

This is a guest post co-written with Udayasimha Theepireddy from Elastic.

Processing and analyzing log and Internet of Things (IoT) data can be challenging, especially when dealing with large volumes of real-time data. Elastic and Amazon Kinesis Data Firehose are two powerful tools that can help make this process easier. For example, by using Kinesis Data Firehose to ingest data from IoT devices, you can stream data directly into Elastic for real-time analysis. This can help you identify patterns and anomalies in the data as they happen, allowing you to take action in real time. Additionally, by using Elastic to store and analyze log data, you can quickly search and filter through large volumes of log data to identify issues and troubleshoot problems.

In this post, we explore how to integrate Elastic and Kinesis Data Firehose to streamline log and IoT data processing and analysis. We walk you through a step-by-step example of how to send VPC flow logs to Elastic through Kinesis Data Firehose.

Solution overview

Elastic is an AWS ISV Partner that helps you find information, gain insights, and protect your data when you run on AWS. Elastic offers enterprise search, observability, and security features that are built on a single, flexible technology stack that can be deployed anywhere.

Kinesis Data Firehose is a popular service that delivers streaming data from over 20 AWS services such as AWS IoT Core and Amazon CloudWatch logs to over 15 analytical and observability tools such as Elastic. Kinesis Data Firehose provides a fast and easy way to send your VPC flow logs data to Elastic in minutes without a single line of code and without building or managing your own data ingestion and delivery infrastructure.

VPC flow logs capture the traffic information going to and from your network interfaces in your VPC. With the launch of Kinesis Data Firehose support to Elastic, you can analyze your VPC flow logs with just a few clicks. Kinesis Data Firehose provides a true end-to-end serverless mechanism to deliver your flow logs to Elastic, where you can use Elastic Dashboards to search through those logs, create dashboards, detect anomalies, and send alerts. VPC flow logs help you to answer questions like what percentage of your traffic is getting dropped, and how much traffic is getting generated for specific sources and destinations.

Integrating Elastic and Kinesis Data Firehose is a straightforward process. There are no agents and beats. Simply configure your Firehose delivery stream to send its data to Elastic’s endpoint.

The following diagram depicts this specific configuration of how to ingest VPC flow logs via Kinesis Data Firehose into Elastic.

In the past, users would have to use an AWS Lambda function to transform the incoming data from VPC flow logs into an Amazon Simple Storage Service (Amazon S3) bucket before loading it into Kinesis Data Firehose or create a CloudWatch Logs subscription that sends any incoming log events that match defined filters to the Firehose delivery stream.

With this new integration, you can set up this configuration directly from your VPC flow logs to Kinesis Data Firehose and into Elastic Cloud. (Note that Elastic Cloud must be deployed on AWS.)

Let’s walk through the details of configuring Kinesis Data Firehose and Elastic, and demonstrate ingesting data.

Prerequisites

To set up this demonstration, make sure you have the following prerequisites:

We walk through installing general AWS integration components into the Elastic Cloud deployment to ensure Kinesis Data Firehose connectivity. Refer to the full list of services supported by the Elastic/AWS integration for more information.

Deploy Elastic on AWS

Follow the instructions on the Elastic registration page to get started on Elastic Cloud.

Once logged in to Elastic Cloud, create a deployment on AWS. It’s important to ensure that the deployment is on AWS. The Firehose delivery stream connects specifically to an endpoint that needs to be on AWS.

After you create your deployment, copy the Elasticsearch endpoint to use in a later step.

The endpoint should be an AWS endpoint, such as https://thevaa-cluster-01.es.us-east-1.aws.found.io.

Enable Elastic’s AWS integration

In your deployment’s Elastic Integration section, navigate to the AWS integration and choose Install AWS assets.

Configure a Firehose delivery stream

Create a new delivery stream on the Kinesis Data Firehose console. This is where you provide the endpoint you saved earlier. Refer to the following screenshot for the destination settings, and for more details, refer to Choose Elastic for Your Destination.

In this example, we are pulling in VPC flow logs via the data stream parameter we added (logs-aws.vpcflow-default). The parameter es_datastream_name can be configured with one of the following types of logs:

  • logs-aws.cloudfront_logs-defaultAWS CloudFront logs
  • logs-aws.ec2_logs-defaultAmazon Elastic Compute Cloud (Amazon EC2) logs in CloudWatch
  • logs-aws.elb_logs-defaultElastic Load Balancing logs
  • logs-aws.firewall_logs-defaultAWS Network Firewall logs
  • logs-aws.route53_public_logs-defaultAmazon Route 53 public DNS queries logs
  • logs-aws.route53_resolver_logs-default – Route 53 DNS queries and responses logs
  • logs-aws.s3access-default – Amazon S3 server access log
  • logs-aws.vpcflow-default – VPC flow logs
  • logs-aws.waf-defaultAWS WAF logs

Deploy your application

Follow the instructions on the GitHub repo and instructions in the AWS Three Tier Web Architecture workshop to deploy your application.

After you install the app, get your credentials from AWS to use with Elastic’s AWS integration.

There are several options for credentials:

  • Use access keys directly
  • Use temporary security credentials
  • Use a shared credentials file
  • Use an AWS Identity and Access Management (IAM) role Amazon Resource Name (ARN)

For more details, refer to AWS Credentials and AWS Permissions.

Configure VPC flow logs to send to Kinesis Data Firehose

In the VPC for the application you deployed, you need to configure your VPC flow logs and point them to the Firehose delivery stream.

Validate the VPC flow logs

In the Elastic Observability view of the log streams, you should see the VPC flow logs coming in after a few minutes, as shown in the following screenshot.

Analyze VPC flow logs in Elastic

Now that you have VPC flow logs in Elastic Cloud, how can you analyze them? There are several analyses you can perform on the VPC flow log data:

  • Use Elastic’s Analytics Discover capabilities to manually analyze the data
  • Use Elastic Observability’s anomaly feature to identify anomalies in the logs
  • Use an out-of-the-box dashboard to further analyze the data

Use Elastic’s Analytics Discover to manually analyze data

In Elastic Analytics, you can search and filter your data, get information about the structure of the fields, and display your findings in a visualization. You can also customize and save your searches and place them on a dashboard.

For a complete understanding of Discover and all of Elastic’s Analytics capabilities, refer to Discover.

For VPC flow logs, it’s important to understand the following:

  • How many logs were accepted or rejected
  • Where potential security violations occur (source IPs from outside the VPC)
  • What port is generally being queried

For our example, we filter the logs on the following:

  • Delivery stream nameAWS-3-TIER-APP-VPC-LOGS
  • VPC flow log actionREJECT
  • Time frame – 5 hours
  • VPC network interface – Webserver 1 and Webserver 2 interfaces

We want to see what IP addresses are trying to hit our web servers. From that, we want to understand which IP addresses we’re getting the most REJECT actions from. We simply find the source.ip field and can quickly get a breakdown that shows 185.156.73.54 is the most rejected for the last 3 or more hours we’ve turned on VPC flow logs.

Additionally, we can create a visualization by choosing Visualize. We get the following donut chart, which we can add to a dashboard.

Additionally to IP addresses, we want to also see what port is being hit on our web servers.

We select the destination port field, and the pop-up shows us that port 8081 is being targeted. This port is generally used for the administration of Apache Tomcat. This is a potential security issue, however port 8081 is turned off for outside traffic, hence the REJECT.

Detect anomalies in Elastic Observability logs

In addition to Discover, Elastic Observability provides the ability to detect anomalies on logs using machine learning (ML). The feature has the following options:

  • Log rate – Automatically detects anomalous log entry rates
  • Categorization – Automatically categorizes log messages

For our VPC flow log, we enabled both features. When we look at what was detected for anomalous log entry rates, we get the following results.

Elastic immediately detected a spike in logs when we turned on VPC flow logs for our application. The rate change is being detected because we’re also ingesting VPC flow logs from another application for a couple of days prior to adding the application in this post.

We can drill down into this anomaly with ML and analyze further.

To learn more about the ML analysis you can utilize with your logs, refer to Machine learning.

Because we know that a spike exists, we can also use the Elastic AIOps Labs Explain Log Rate Spikes capability. Additionally, we’ve grouped them to see what is causing some of the spikes.

In the preceding screenshot, we can observe that a specific network interface is sending more VPC log flows than others. We can drill down into this further in Discover.

Use the VPC flow log dashboard

Finally, Elastic also provides an out-of-the-box dashboard to show the top IP addresses hitting your VPC, geographically where they are coming from, the time series of the flows, and a summary of VPC flow log rejects within the time frame.

You can enhance this baseline dashboard with the visualizations you find in Discover, as we discussed earlier.

Conclusion

This post demonstrated how to configure an integration with Kinesis Data Firehose and Elastic for efficient infrastructure monitoring of VPC flow logs in Elastic Kibana dashboards. Elastic offers flexible deployment options on AWS, supporting software as a service (SaaS), AWS Marketplace, and bring your own license (BYOL) deployments. Elastic also provides AWS Marketplace private offers. You have the option to deploy and run the Elastic Stack yourself within your AWS account, either free or with a paid subscription from Elastic. To get started, visit the Kinesis Data Firehose console and specify Elastic as the destination. To learn more, explore the Amazon Kinesis Data Firehose Developer Guide.


About the Authors

Udayasimha Theepireddy is an Elastic Principal Solution Architect, where he works with customers to solve real world technology problems using Elastic and AWS services. He has a strong background in technology, business, and analytics.

Antony Prasad Thevaraj is a Sr. Partner Solutions Architect in Data and Analytics at AWS. He has over 12 years of experience as a Big Data Engineer, and has worked on building complex ETL and ELT pipelines for various business units.

Mostafa Mansour is a Principal Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Firehose. He specializes in developing intuitive product experiences that solve complex challenges for customers at scale. When he’s not hard at work on Amazon Kinesis Data Firehose, you’ll likely find Mostafa on the squash court, where he loves to take on challengers and perfect his dropshots.

Serverless logging with Amazon OpenSearch Service and Amazon Kinesis Data Firehose

Post Syndicated from Jon Handler original https://aws.amazon.com/blogs/big-data/serverless-logging-with-amazon-opensearch-service-and-amazon-kinesis-data-firehose/

In this post, you will learn how you can use Amazon Kinesis Data Firehose to build a log ingestion pipeline to send VPC flow logs to Amazon OpenSearch Serverless. First, you create the OpenSearch Serverless collection you use to store VPC flow logs, then you create a Kinesis Data Firehose delivery pipeline that forwards the flow logs to OpenSearch Serverless. Finally, you enable delivery of VPC flow logs to your Firehose delivery stream. The following diagram illustrates the solution workflow.

OpenSearch Serverless is a new serverless option offered by Amazon OpenSearch Service. OpenSearch Serverless makes it simple to run petabyte-scale search and analytics workloads without having to configure, manage, or scale OpenSearch clusters. OpenSearch Serverless automatically provisions and scales the underlying resources to deliver fast data ingestion and query responses for even the most demanding and unpredictable workloads.

Kinesis Data Firehose is a popular service that delivers streaming data from over 20 AWS services to over 15 analytical and observability tools such as OpenSearch Serverless. Kinesis Data Firehose is great for those looking for a fast and easy way to send your VPC flow logs data to your OpenSearch Serverless collection in minutes without a single line of code and without building or managing your own data ingestion and delivery infrastructure.

VPC flow logs capture the traffic information going to and from your network interfaces in your VPC. With the launch of Kinesis Data Firehose support to OpenSearch Serverless, it makes an easy solution to analyze your VPC flow logs with just a few clicks. Kinesis Data Firehose provides a true end-to-end serverless mechanism to deliver your flow logs to OpenSearch Serverless, where you can use OpenSearch Dashboards to search through those logs, create dashboards, detect anomalies, and send alerts. VPC flow logs helps you to answer questions like:

  • What percentage of your traffic is getting dropped?
  • How much traffic is getting generated for specific sources and destinations?

Create your OpenSearch Serverless collection

To get started, you first create a collection. An OpenSearch Serverless collection is a logical grouping of one or more indexes that represent an analytics workload. Complete the following steps:

  1. On the OpenSearch Service console, choose Collections under Serverless in the navigation pane.
  2. Choose Create a collection.
  3. For Collection name, enter a name (for example, vpc-flow-logs).
  4. For Collection type¸ choose Time series.
  5. For Encryption, choose your preferred encryption setting:
    1. Choose Use AWS owned key to use an AWS managed key.
    2. Choose a different AWS KMS key to use your own AWS Key Management Service (AWS KMS) key.
  6. For Network access settings, choose your preferred setting:
    1. Choose VPC to use a VPC endpoint.
    2. Choose Public to use a public endpoint.

AWS recommends that you use a VPC endpoint for all production workloads. For this walkthrough, select Public.

  1. Choose Create.

It should take couple of minutes to create the collection.

The following graphic gives a quick demonstration of creating the OpenSearch Serverless collection via the preceding steps.

At this point, you have successfully created a collection for OpenSearch Serverless. Next, you create a delivery pipeline for Kinesis Data Firehose.

Create a Kinesis Data Firehose delivery stream

To set up a delivery stream for Kinesis Data Firehose, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. For Source, specify Direct PUT.

Check out Source, Destination, and Name to learn more about different sources supported by Kinesis Data Firehose.

  1. For Destination, choose Amazon OpenSearch Serverless.
  2. For Delivery stream name, enter a name (for example, vpc-flow-logs).
  3. Under Destination settings, in the OpenSearch Serverless collection settings, choose Browse.
  4. Select vpc-flow-logs.
  5. Choose Choose.

If your collection is still creating, wait a few minutes and try again.

  1. For Index, specify vpc-flow-logs.
  2. In the Backup settings section, select Failed data only for the Source record backup in Amazon S3.

Kinesis Data Firehose uses Amazon Simple Storage Service (Amazon S3) to back up failed data that it attempts to deliver to your chosen destination. If you want to keep all data, select All data.

  1. For S3 Backup Bucket, choose Browse to select an existing S3 bucket, or choose Create to create a new bucket.
  2. Choose Create delivery stream.

The following graphic gives a quick demonstration of creating the Kinesis Data Firehose delivery stream via the preceding steps.

At this point, you have successfully created a delivery stream for Kinesis Data Firehose, which you will use to stream data from your VPC flow logs and send it to your OpenSearch Serverless collection.

Set up the data access policy for your OpenSearch Serverless collection

Before you send any logs to OpenSearch Serverless, you need to create a data access policy within OpenSearch Serverless that allows Kinesis Data Firehose to write to the vpc-flow-logs index in your collection. Complete the following steps:

  1. On the Kinesis Data Firehose console, choose the Configuration tab on the details page for the vpc-flow-logs delivery stream you just created.
  2. In the Permissions section, note down the AWS Identity and Access Management (IAM) role.
  3. Navigate to the vpc-flow-logs collection details page on the OpenSearch Serverless dashboard.
  4. Under Data access, choose Manage data access.
  5. Choose Create access policy.
  6. In the Name and description section, specify an access policy name, add a description, and select JSON as the policy definition method.
  7. Add the following policy in the JSON editor. Provide the collection name and index you specified during the delivery stream creation in the policy. Provide the IAM role name that you got from the permissions page of the Firehose delivery stream, and the account ID for your AWS account.
    [
      {
        "Rules": [
          {
            "ResourceType": "index",
            "Resource": [
              "index/<collection-name>/<index-name>"
            ],
            "Permission": [
              "aoss:WriteDocument",
              "aoss:CreateIndex",
              "aoss:UpdateIndex"
            ]
          }
        ],
        "Principal": [
          "arn:aws:sts::<aws-account-id>:assumed-role/<IAM-role-name>/*"
        ]
      }
    ]

  8. Choose Create.

The following graphic gives a quick demonstration of creating the data access policy via the preceding steps.

Set up VPC flow logs

In the final step of this post, you enable flow logs for your VPC with the destination as Kinesis Data Firehose, which sends the data to OpenSearch Serverless.

  1. Navigate to the AWS Management Console.
  2. Search for “VPC” and then choose Your VPCs in the search result (hover over the VPC rectangle to reveal the link).
  3. Choose the VPC ID link for one of your VPCs.
  4. On the Flow Logs tab, choose Create flow log.
  5. For Name, enter a name.
  6. Leave the Filter set to All. You can limit the traffic by selecting Accept or Reject.
  7. Under Destination, select Send to Kinesis Firehose in the same account.
  8. For Kinesis Firehose delivery stream name, choose vpc-flow-logs.
  9. Choose Create flow log.

The following graphic gives a quick demonstration of creating a flow log for your VPC following the preceding steps.

Examine the VPC flow logs data in your collection using OpenSearch Dashboards

You won’t be able to access your collection data until you configure data access. Data access policies allow users to access the actual data within a collection.

To create a data access policy for OpenSearch Dashboards, complete the following steps:

  1. Navigate to the vpc-flow-logs collection details page on the OpenSearch Serverless dashboard.
  2. Under Data access, choose Manage data access.
  3. Choose Create access policy.
  4. In the Name and description section, specify an access policy name, add a description, and select JSON as the policy definition method.
  5. Add the following policy in the JSON editor. Provide the collection name and index you specified during the delivery stream creation in the policy. Additionally, provide the IAM user and the account ID for your AWS account. You need to make sure that you have the AWS access and secret keys for the principal that you specified as an IAM user.
    [
      {
        "Rules": [
          {
            "Resource": [
              "index/<collection-name>/<index-name>"
            ],
            "Permission": [
              "aoss:ReadDocument"
            ],
            "ResourceType": "index"
          }
        ],
        "Principal": [
          "arn:aws:iam::<aws-account-id>:user/<IAM-user-name>"
        ]
      }
    ]

  6. Choose Create.
  7. Navigate to OpenSearch Serverless and choose the collection you created (vpc-flow-logs).
  8. Choose the OpenSearch Dashboards URL and log in with your IAM access key and secret key for the user you specified under Principal.
  9. Navigate to dev tools within OpenSearch Dashboards and run the following query to retrieve the VPC flow logs for your VPC:
    GET <index-name>/_search
    {
      "query": {
        "match_all": {}
      }
    }

The query returns the data as shown in the following screenshot, which contains information such as account ID, interface ID, source IP address, destination IP address, and more.

Create dashboards

After the data is flowing into OpenSearch Serverless, you can easily create dashboards to monitor the activity in your VPC. The following example dashboard shows overall traffic, accepted and rejected traffic, bytes transmitted, and some charts with the top sources and destinations.

Clean up

If you don’t want to continue using the solution, be sure to delete the resources you created:

  1. Return to the AWS console and in the VPCs section, disable the flow logs for your VPC.
  2. In the OpenSearch Serverless dashboard, delete your vpc-flow-logs collection.
  3. On the Kinesis Data Firehose console, delete your vpc-flow-logs delivery stream.

Conclusion

In this post, you created an end-to-end serverless pipeline to deliver your VPC flow logs to OpenSearch Serverless using Kinesis Data Firehose. In this example, you built a delivery pipeline for your VPC flow logs, but you can also use Kinesis Data Firehose to send logs from Amazon Kinesis Data Streams and Amazon CloudWatch, which in turn can be sent to OpenSearch Serverless collections for running analytics on those logs. With serverless solutions on AWS, you can focus on your application development rather than worrying about the ingestion pipeline and tools to visualize your logs.

Get hands-on with OpenSearch Serverless by taking the Getting Started with Amazon OpenSearch Serverless workshop and check out other pipelines for analyzing your logs.

If you have feedback about this post, share it in the comments section. If you have questions about this post, start a new thread on the Amazon OpenSearch Service forum or contact AWS Support.


About the authors

Jon Handler (@_searchgeek) is a Principal Solutions Architect at Amazon Web Services based in Palo Alto, CA. Jon works closely with the CloudSearch and Elasticsearch teams, providing help and guidance to a broad range of customers who have search workloads that they want to move to the AWS Cloud. Prior to joining AWS, Jon’s career as a software developer included four years of coding a large-scale, eCommerce search engine.

Prashant Agrawal is a Sr. Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

Stream VPC flow logs to Amazon OpenSearch Service via Amazon Kinesis Data Firehose

Post Syndicated from Chaitanya Shah original https://aws.amazon.com/blogs/big-data/stream-vpc-flow-logs-to-amazon-opensearch-service-via-amazon-kinesis-data-firehose/

Amazon Virtual Private Cloud (Amazon VPC) flow logs enable you to track the IP traffic going to and from the network interfaces in your VPC for your workloads. Analyzing VPC logs helps you understand how your applications are communicating over your VPC network with log records and acts as a main source of information to the network in your VPC. After collecting the flow logs, the next step is performing log analysis to understand user or application behavior and patterns to make informed decisions. You can analyze logs using log analytics tools such as Amazon OpenSearch Service.

Amazon Kinesis Data Firehose is a fully managed service for delivering near real-time streaming data to various destinations for storage and performing near real-time analytics. With its extensible data transformation capabilities, you can also streamline log processing and log delivery pipelines into a single Firehose delivery stream.

Amazon OpenSearch Service makes it easy for you to perform interactive log analytics, real-time application monitoring, website search, and more. Amazon OpenSearch is an open source, distributed search and analytics suite. Amazon OpenSearch Service offers the latest versions of OpenSearch, support for 19 versions of Elasticsearch (1.5 to 7.10 versions), as well as visualization capabilities powered by OpenSearch Dashboards and Kibana (1.5 to 7.10 versions). Amazon OpenSearch Service currently has tens of thousands of active customers with hundreds of thousands of clusters under management processing trillions of requests per month.

In this post, you will learn how to ingest VPC flow logs with Kinesis Data Firehose and deliver them to an Amazon OpenSearch Service for analysis using OpenSearch Service Dashboards.

Overview of solution

This solution uses native integration of VPC flow logs streaming to Kinesis Data Firehose. We use a Firehose delivery stream to buffer the streamed VPC flow logs, and deliver those to an OpenSearch Service destination endpoint. We use Amazon OpenSearch Service Dashboards to create an index pattern for the VPC flow logs to analyze and visualize the logs in a near-real time. The following diagram illustrates this architecture.

Solution Architecture

We walk you through the following high-level steps:

  1. Create an OpenSearch Service domain for storing and analyzing the VPC flow logs.
  2. Create a Firehose delivery stream to deliver the flow logs to the OpenSearch Service domain.
  3. Create a VPC flow log subscription to the delivery stream.
  4. Explore VPC flow logs in OpenSearch Service Dashboards
    • Create role mapping with an OpenSearch Service user to the Kinesis Data Firehose service role. Because we’re using a public access domain for OpenSearch Service, we have to map the delivery stream AWS Identity and Access Management (IAM) role to the OpenSearch Service primary user to deliver logs in bulk to the OpenSearch Service domain.
    • Create an index pattern in OpenSearch Service Dashboards to enable analysis and visualization of VPC logs.

Prerequisites

As a prerequisite, you need to create an Amazon Simple Storage Service (Amazon S3) bucket to store the Firehose delivery stream backups and failed logs.

Create an Amazon OpenSearch Service domain

For demonstration purposes, and to limit the costs, we create an OpenSearch Service domain with the Development and testing deployment type and public access to the dashboard. For instructions, refer to Create an Amazon OpenSearch Service domain. Note that we select Public access only for demo purposes. For production, we recommend using VPC access for security reasons.

When it’s complete, the OpenSearch Service domain shows as Active.

OpenSearch Domain

Create a Kinesis Data Firehose delivery stream

Now that your Amazon OpenSearch Service domain is active, you can create a Firehose delivery stream where VPC flow logs are streamed.

  1. On the Amazon Kinesis console, choose Kinesis Data Firehose in the navigation pane, then choose Create delivery stream.
  2. Choose Direct PUT as the source and set the destination as Amazon OpenSearch Service.
  3. For Delivery stream name, enter PUT-OPENSEARCH-STREAM-DEMO.Kinesis Delivery Stream
  4. In the Destination settings section, choose Browse and choose the previously created Amazon OpenSearch Service domain.
  5. For Index name, enter vpcflowlogs.
  6. For Index rotation, choose Every day.
  7. For this post, we set Buffer size to 5 and Buffer interval to 900.You can modify these settings to optimize ingestion throughput and near-real-time behavior.
    Kinesis Stream Destination setting
  1. In the Backup settings section, for Source record backup in Amazon S3, select Failed events only so you only save the data that fails to deliver to Amazon OpenSearch Service.
  2. For S3 bucket, choose Browse and choose the S3 bucket you created to store failed logs and backups.
  3. Optionally, you can input a prefix for backup files and error files.
  4. Select GZIP for Compression for data records.
  5. For Encryption for data records, select Disabled.Kinesis Stream - Backup Setting
  6. Expand Advanced settings, and for Amazon CloudWatch error logging, select Enabled.
  7. Choose Create delivery stream.Kinesis Stream - Advance Setting

When the delivery stream is active, proceed to the next step.

Create a VPC flow logs subscription

Now you create a VPC flow logs subscription for the Firehose delivery stream you created in the previous step.

  1. On the Amazon VPC console, choose Your VPCs.
  2. Select the VPC for which to create the flow log.
  3. On the Actions menu, choose Create flow log.VPC Flow Log
  4. Select All to send all flow log records to Amazon OpenSearch Service.

If you want to filter the flow logs, you can select either Accept or Reject.

  1. For Maximum aggregation interval, select 10 minutes or the minimum setting of 1 minute if you need the flow log data to be available for near-real-time analysis in Amazon OpenSearch Service.
  2. For Destination, select Send to Kinesis Firehose in the same account if the delivery stream is set up on the same account where you create the VPC flow logs.
  3. For Log record format, if you leave it at AWS default format, the flow logs are sent as version 2 format.

Alternatively, you can specify which fields you need the flow logs to capture and send to an Amazon OpenSearch Service. For more information on log format and available fields, refer to Flow log records.

  1. Choose Create flow log.Create VPC Flow Logs

Now let’s explore the VPC flow logs in Amazon OpenSearch Service.

Explore VPC flow logs in Amazon OpenSearch Service Dashboards

In the final step, we set up OpenSearch Service Dashboards to explore the VPC flow logs.

  1. On the OpenSearch Service console, choose Domains in the navigation pane.
  2. Choose the domain you created.
  3. Under OpenSearch Dashboards URL, choose the link to open a new tab.OpenSearch Dashboard
  4. Log in with the user you created during OpenSearch Service domain setup.OpenSearch Service Dashboard
  5. Select Private for Select your tenant, then choose Confirm.OpenSearch Service Dashboard Tenant

Because we used a public access domain for OpenSearch Service, you need to map the role created for the Firehose delivery stream to the OpenSearch Service Dashboards user, so that the delivery stream can deliver logs in bulk to the OpenSearch Service domain.

  1. On the menu icon, choose Security.
  2. Choose Roles.
  3. Choose the all_access role.OpenSearch Service All Access Role
  4. On the Mapped users tab, choose Manage mapping.OpenSearch Service Dashboard map role
  5. For Backend roles, enter the IAM role ARN created for the Firehose delivery stream.
  6. Choose Map.OpenSearch Service Dashboard Map role arn
  7. Now that mapping is complete, choose the menu icon, then choose Stack management.OpenSearch Service Dashboard Stack Management
  8. Choose Index Patterns, then choose Create index pattern.
  9. For Index pattern name, enter vpcflowlogs*.
  10. Choose Next step.OpenSearch Service Dashboard Create Index
  11. Navigate to the Discover menu option.You can see the VPC flow logs from your VPC in this dashboard. Now you can search and visualize the flow logs that are being streamed in near-real time to the OpenSearch Service domain.
    OpenSearch Service Dashboard Discover

Clean up

After you test out this solution, remember to delete all the resources you created to avoid incurring future charges:

  1. Delete your Amazon OpenSearch Service domain.
  2. Delete the VPC flow logs subscription.
  3. Delete the Firehose delivery stream.
  4. Delete the S3 bucket for the VPC flow logs backup and failed logs.
  5. If you created a new VPC and new resources in the VPC, delete the resources and VPC.

Conclusion

In this post, we walked through a solution of how integrate VPC flow logs with a Kinesis Data Firehose delivery stream and deliver it to an Amazon OpenSearch Service destination with no code and visualize it in OpenSearch Service Dashboards.

Try this new quick and hassle-free way of sending your VPC flow logs to an Amazon OpenSearch Service using Kinesis Data Firehose.


About the Author

Chaitanya Shah is a Sr. Technical Account Manager with AWS, based out of New York. He has over 22 years of experience working with enterprise customers. He loves to code and actively contributes to the AWS solutions labs to help customers solve complex problems. He provides guidance to AWS customers on best practices for their AWS Cloud migrations. He is also specialized in AWS data transfer and the data and analytics domain.

Monitor AWS workloads without a single line of code with Logz.io and Kinesis Firehose

Post Syndicated from Amos Etzion original https://aws.amazon.com/blogs/big-data/monitor-aws-workloads-without-a-single-line-of-code-with-logz-io-and-kinesis-firehose/

Observability data provides near real-time insights into the health and performance of AWS workloads, so that engineers can quickly address production issues and troubleshoot them before widespread customer impact.

As AWS workloads grow, observability data has been exploding, which requires flexible big data solutions to handle the throughput of large and unpredictable volumes of observability data.

Solution overview

One option is Amazon Kinesis Data Firehose, which is a popular service for streaming huge volumes of AWS data for storage and analytics. By pulling data from Amazon CloudWatch, Amazon Kinesis Data Firehose can deliver data to observability solutions.

Among these observability solutions is Logz.io, which can now ingest metric data from Amazon Kinesis Data Firehose and make it easier to get metrics from your AWS account to your Logz.io account for analysis, alerting, and correlation with logs and traces.

In a few clicks and a few configurations, we’ll see how you can start streaming your metric data (and soon, log data!) to Logz.io for storage and analysis.

Prerequisites

  • Logz.io account – Create a free trial here
  • Logz.io shipping token – Learn about metrics tokens here. You need to be a Logz.io administrator.
  • Access to Amazon CloudWatch and Amazon Kinesis Data Firehose with the appropriate permissions to manage HTTP endpoints.
  • Appropriate permissions to create an Amazon Simple Storage Service (Amazon S3) bucket

Sending Amazon CloudWatch metric data to Logz.io with an Amazon Kinesis Data Firehose

Amazon Kinesis Data Firehose is a service for ingesting, processing, and loading data from large, distributed sources such as logs or clickstreams into multiple consumers for storage and real-time analytics. Kinesis Data Firehose supports more than 50 sources and destinations as of today. This integration can be set up in minutes without a single line of code and enables near real-time analytics for observability data generated by AWS services by using Amazon CloudWatch, Amazon Kinesis Data Firehose, and Logz.io.

Once the integration is configured, Logz.io customers can open the Infrastructure Monitoring product to see their data coming in and populating their dashboards. To see some of the data analytics and correlation you get with Logz.io, check out this short demonstration.

Let’s begin a step-by-step tutorial for setting up the integration.

  • Start by going to Amazon Kinesis Data Firehose and creating a delivery stream with Data Firehose.

Kinesis Firehose Console

  • Next you select a source and destination. Select Direct Put as the source and Logz.io the destination.
  • Next, configure the destination settings. Give the HTTP endpoint a name, which should include logz.io.
  • Select from the dropdown the appropriate endpoint you would like to use.

If you’re sending data to a European region, then set it to Logz.io Metrics EU. Or you can use the us-east-1 destination by selecting Logz.io Metrics US.

  • Next, add your Logz.io Shipping Token. You can find this by going to Settings in Logz.io and selecting Manage Tokens, which requires Logz.io administrator to access. This ensures that your account is only ingesting data from the defined sources (e.g., this Amazon Kinesis Data Firehose delivery stream).

Kinesis Stream config

Keep Content encoding on Disabled and set your desired Retry Duration.

You can also configure Buffer hints to your preferences.

  • Next, determine your Backup settings in case something goes wrong. In most cases, it’s only necessary to back up the failed data. Simply choose an Amazon S3 bucket or create a new one to store data if it doesn’t make it to Logz.io. Then, select Create a delivery stream.

Now it’s time to connect Amazon CloudWatch to our Amazon Kinesis Data Firehose Delivery Stream.

  • Navigate to Amazon CloudWatch and select Streams in the Metrics menu. Select Create metrics stream.
  • Next, you can either select to send all your Amazon CloudWatch metrics to Logz.io, or only metrics from specified namespaces.

In this case, we chose Amazon Elastic Compute Cloud (Amazon EC2), Amazon Relational Database Service (Amazon RDS), AWS Lambda, and Elastic Load Balancing (ELB).

  • Under Configuration, choose the Select an existing Firehose owned by your account option and choose the Amazon Kinesis Data Firehose you just configured.

Metric Streams Config

If you’d like, you can choose additional statistics in the Add additional statistics box, which provides helpful metrics in terms of percentiles to monitor like latency metrics (i.e., which services have the highest average latency). This may increase your costs.

  • Lastly, give your metric stream a name and hit Create metric stream.

That’s it! Without writing a single line of code, we configured an integration with AWS and Logz.io that enables fast and easy infrastructure monitoring through Amazon CloudWatch data collection.

Your metrics will be stored in Logz.io for 18 months out of the box, without requiring any overhead management.

You can also begin to build dashboards and alerts to begin monitoring – like this Amazon EC2 monitoring dashboard below.

ec2 monitoring dashboard Logz.io

Conclusion

This post demonstrated how to configure an integration with AWS and Logz.io for efficient infrastructure monitoring through Amazon CloudWatch.

To learn more about building metrics dashboards in Logz.io, you can watch this video.

Currently, some users might find that they are sending more data than they really need, which can raise costs. In future versions of this integration, it will be easier to narrow down the metrics to reduce costs.

Want to try it yourself? Create a Logz.io account today, navigate to our infrastructure monitoring product, and start streaming metric data to Logz.io to start monitoring.


About the authors

Amos Etzion – Product Manager at Logz.io

Charlie Klein – Product Marketing Manager at Logz.io

Mark Kriaf – Partner Solutions Architect at AWS

Building a healthcare data pipeline on AWS with IBM Cloud Pak for Data

Post Syndicated from Eduardo Monich Fronza original https://aws.amazon.com/blogs/architecture/building-a-healthcare-data-pipeline-on-aws-with-ibm-cloud-pak-for-data/

Healthcare data is being generated at an increased rate with the proliferation of connected medical devices and clinical systems. Some examples of these data are time-sensitive patient information, including results of laboratory tests, pathology reports, X-rays, digital imaging, and medical devices to monitor a patient’s vital signs, such as blood pressure, heart rate, and temperature.

These different types of data can be difficult to work with, but when combined they can be used to build data pipelines and machine learning (ML) models to address various challenges in the healthcare industry, like the prediction of patient outcome, readmission rate, or disease progression.

In this post, we demonstrate how to bring data from different sources, like Snowflake and connected health devices, to form a healthcare data lake on Amazon Web Services (AWS). We also explore how to use this data with IBM Watson to build, train, and deploy ML models. You can learn how to integrate model endpoints with clinical health applications to generate predictions for patient health conditions.

Solution overview

The main parts of the architecture we discuss are (Figure 1):

  1. Using patient data to improve health outcomes
  2. Healthcare data lake formation to store patient health information
  3. Analyzing clinical data to improve medical research
  4. Gaining operational insights from healthcare provider data
  5. Providing data governance to maintain the data privacy
  6. Building, training, and deploying an ML model
  7. Integration with the healthcare system
Data pipeline for the healthcare industry using IBM CP4D on AWS

Figure 1. Data pipeline for the healthcare industry using IBM CP4D on AWS

IBM Cloud Pak for Data (CP4D) is deployed on Red Hat OpenShift Service on AWS (ROSA). It provides the components IBM DataStage, IBM Watson Knowledge Catalogue, IBM Watson Studio, IBM Watson Machine Learning, plus a wide variety of connections with data sources available in a public cloud or on-premises.

Connected health devices, on the edge, use sensors and wireless connectivity to gather patient health data, such as biometrics, and send it to the AWS Cloud through Amazon Kinesis Data Firehose. AWS Lambda transforms the data that is persisted to Amazon Simple Storage Service (Amazon S3), making that information available to healthcare providers.

Amazon Simple Notification Service (Amazon SNS) is used to send notifications whenever there is an issue with the real-time data ingestion from the connected health devices. In case of failures, messages are sent via Amazon SNS topics for rectifying and reprocessing of failure messages.

DataStage performs ETL operations and move patient historical information from Snowflake into Amazon S3. This data, combined with the data from the connected health devices, form a healthcare data lake, which is used in IBM CP4D to build and train ML models.

The pipeline described in architecture uses Watson Knowledge Catalogue, which provides data governance framework and artifacts to enrich our data assets. It protects sensitive patient information from unauthorized access, like individually identifiable information, medical history, test results, or insurance information.

Data protection rules define how to control access to data, mask sensitive values, or filter rows from data assets. The rules are automatically evaluated and enforced each time a user attempts to access a data asset in any governed catalog of the platform.

After this, the datasets are published to Watson Studio projects, where they are used to train ML models. You can develop models using Jupyter Notebook, IBM AutoAI (low-code), or IBM SPSS modeler (no-code).

For the purpose of this use case, we used logistic regression algorithm for classifying and predicting the probability of an event, such as disease risk management to assist doctors in making critical medical decisions. You can also build ML models using algorithms like Classification, Random Forest, and K-Nearest Neighbor. These are widely used to predict disease risk.

Once the models are trained, they are exposed as endpoints with Watson Machine Learning and integrated with the healthcare application to generate predictions by analyzing patient symptoms.

The healthcare applications are a type of clinical software that offer crucial physiological insights and predict the effects of illnesses and possible treatments. It provides built-in dashboards that display patient information together with the patient’s overall metrics for outcomes and treatments. This can help healthcare practitioners gain insights into patient conditions. It also can help medical institutions prioritize patients with more risk factors and curate clinical and behavioral health plans.

Finally, we are using IBM Security QRadar XDR SIEM to collect, process, and aggregate Amazon Virtual Private Cloud (Amazon VPC) flow logs, AWS CloudTrail logs, and IBM CP4D logs. QRadar XDR uses this information to manage security by providing real-time monitoring, alerts, and responses to threats.

Healthcare data lake

A healthcare data lake can help health organizations turn data into insights. It is centralized, curated, and securely stores data on Amazon S3. It also enables you to break down data silos and combine different types of analytics to gain insights. We are using the DataStage, Kinesis Data Firehose, and Amazon S3 services to build the healthcare data lake.

Data governance

Watson Knowledge Catalogue provides an ML catalogue for data discovery, cataloging, quality, and governance. We define policies in Watson Knowledge Catalogue to enable data privacy and overall access to and utilization of this data. This includes sensitive data and personal information that needs to be handled through data protection, quality, and automation rules. To learn more about IBM data governance, please refer to Running a data quality analysis (Watson Knowledge Catalogue).

Build, train, and deploy the ML model

Watson Studio empowers data scientists, developers, and analysts to build, run, and manage AI models on IBM CP4D.

In this solution, we are building models using Watson Studio by:

  1. Promoting the governed data from Watson Knowledge Catalogue to Watson Studio for insights
  2. Using ETL features, such as built-in search, automatic metadata propagation, and simultaneous highlighting, to process and transform large amounts of data
  3. Training the model, including model technique selection and application, hyperparameter setting and adjustment, validation, ensemble model development and testing; algorithm selection; and model optimization
  4. Evaluating the model based on metric evaluation, confusion matrix calculations, KPIs, model performance metrics, model quality measurements for accuracy and precision
  5. Deploying the model on Watson Machine Learning using online deployments, which create an endpoint to generate a score or prediction in real time
  6. Integrating the endpoint with applications like health applications, as demonstrated in Figure 1

Conclusion

In this blog, we demonstrated how to use patient data to improve health outcomes by creating a healthcare data lake and analyzing clinical data. This can help patients and healthcare practitioners make better, faster decisions and prioritize cases. We also discussed how to build an ML model using IBM Watson and integrate it with healthcare applications for health analysis.

Additional resources

Enrich VPC Flow Logs with resource tags and deliver data to Amazon S3 using Amazon Kinesis Data Firehose

Post Syndicated from Chaitanya Shah original https://aws.amazon.com/blogs/big-data/enrich-vpc-flow-logs-with-resource-tags-and-deliver-data-to-amazon-s3-using-amazon-kinesis-data-firehose/

VPC Flow Logs is an AWS feature that captures information about the network traffic flows going to and from network interfaces in Amazon Virtual Private Cloud (Amazon VPC). Visibility to the network traffic flows of your application can help you troubleshoot connectivity issues, architect your application and network for improved performance, and improve security of your application.

Each VPC flow log record contains the source and destination IP address fields for the traffic flows. The records also contain the Amazon Elastic Compute Cloud (Amazon EC2) instance ID that generated the traffic flow, which makes it easier to identify the EC2 instance and its associated VPC, subnet, and Availability Zone from where the traffic originated. However, when you have a large number of EC2 instances running in your environment, it may not be obvious where the traffic is coming from or going to simply based on the EC2 instance IDs or IP addresses contained in the VPC flow log records.

By enriching flow log records with additional metadata such as resource tags associated with the source and destination resources, you can more easily understand and analyze traffic patterns in your environment. For example, customers often tag their resources with resource names and project names. By enriching flow log records with resource tags, you can easily query and view flow log records based on an EC2 instance name, or identify all traffic for a certain project.

In addition, you can add resource context and metadata about the destination resource such as the destination EC2 instance ID and its associated VPC, subnet, and Availability Zone based on the destination IP in the flow logs. This way, you can easily query your flow logs to identify traffic crossing Availability Zones or VPCs.

In this post, you will learn how to enrich flow logs with tags associated with resources from VPC flow logs in a completely serverless model using Amazon Kinesis Data Firehose and the recently launched Amazon VPC IP Address Manager (IPAM), and also analyze and visualize the flow logs using Amazon Athena and Amazon QuickSight.

Solution overview

In this solution, you enable VPC flow logs and stream them to Kinesis Data Firehose. This solution enriches log records using an AWS Lambda function on Kinesis Data Firehose in a completely serverless manner. The Lambda function fetches resource tags for the instance ID. It also looks up the destination resource from the destination IP using the Amazon EC2 API and IPAM, and adds the associated VPC network context and metadata for the destination resource. It then stores the enriched log records in an Amazon Simple Storage Service (Amazon S3) bucket. After you have enriched your flow logs, you can query, view, and analyze them in a wide variety of services, such as AWS Glue, Athena, QuickSight, Amazon OpenSearch Service, as well as solutions from the AWS Partner Network such as Splunk and Datadog.

The following diagram illustrates the solution architecture.

Architecture

The workflow contains the following steps:

  1. Amazon VPC sends the VPC flow logs to the Kinesis Data Firehose delivery stream.
  2. The delivery stream uses a Lambda function to fetch resource tags for instance IDs from the flow log record and add it to the record. You can also fetch tags for the source and destination IP address and enrich the flow log record.
  3. When the Lambda function finishes processing all the records from the Kinesis Data Firehose buffer with enriched information like resource tags, Kinesis Data Firehose stores the result file in the destination S3 bucket. Any failed records that Kinesis Data Firehose couldn’t process are stored in the destination S3 bucket under the prefix you specify during delivery stream setup.
  4. All the logs for the delivery stream and Lambda function are stored in Amazon CloudWatch log groups.

Prerequisites

As a prerequisite, you need to create the target S3 bucket before creating the Kinesis Data Firehose delivery stream.

If using a Windows computer, you need PowerShell; if using a Mac, you need Terminal to run AWS Command Line Interface (AWS CLI) commands. To install the latest version of the AWS CLI, refer to Installing or updating the latest version of the AWS CLI.

Create a Lambda function

You can download the Lambda function code from the GitHub repo used in this solution. The example in this post assumes you are enabling all the available fields in the VPC flow logs. You can use it as is or customize per your needs. For example, if you intend to use the default fields when enabling the VPC flow logs, you need to modify the Lambda function with the respective fields. Creating this function creates an AWS Identity and Access Management (IAM) Lambda execution role.

To create your Lambda function, complete the following steps:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose Create function.
  3. Select Author from scratch.
  4. For Function name, enter a name.
  5. For Runtime, choose Python 3.8.
  6. For Architecture, select x86_64.
  7. For Execution role, select Create a new role with basic Lambda permissions.
  8. Choose Create function.

Create Lambda Function

You can then see code source page, as shown in the following screenshot, with the default code in the lambda_function.py file.

  1. Delete the default code and enter the code from the GitHub Lambda function aws-vpc-flowlogs-enricher.py.
  2. Choose Deploy.

VPC Flow Logs Enricher function

To enrich the flow logs with additional tag information, you need to create an additional IAM policy to give Lambda permission to describe tags on resources from the VPC flow logs.

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. On the JSON tab, enter the JSON code as shown in the following screenshot.

This policy gives the Lambda function permission to retrieve tags for the source and destination IP and retrieve the VPC ID, subnet ID, and other relevant metadata for the destination IP from your VPC flow log record.

  1. Choose Next: Tags.

Tags

  1. Add any tags and choose Next: Review.

  1. For Name, enter vpcfl-describe-tag-policy.
  2. For Description, enter a description.
  3. Choose Create policy.

Create IAM Policy

  1. Navigate to the previously created Lambda function and choose Permissions in the navigation pane.
  2. Choose the role that was created by Lambda function.

A page opens in a new tab.

  1. On the Add permissions menu, choose Attach policies.

Add Permissions

  1. Search for the vpcfl-describe-tag-policy you just created.
  2. Select the vpcfl-describe-tag-policy and choose Attach policies.

Create the Kinesis Data Firehose delivery stream

To create your delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. For Source, choose Direct PUT.
  3. For Destination, choose Amazon S3.

Kinesis Firehose Stream Source and Destination

After you choose Amazon S3 for Destination, the Transform and convert records section appears.

  1. For Data transformation, select Enable.
  2. Browse and choose the Lambda function you created earlier.
  3. You can customize the buffer size as needed.

This impacts on how many records the delivery stream will buffer before it flushes it to Amazon S3.

  1. You can also customize the buffer interval as needed.

This impacts how long (in seconds) the delivery stream will buffer the incoming records from the VPC.

  1. Optionally, you can enable Record format conversion.

If you want to query from Athena, it’s recommended to convert it to Apache Parquet or ORC and compress the files with available compression algorithms, such as gzip and snappy. For more performance tips, refer to Top 10 Performance Tuning Tips for Amazon Athena. In this post, record format conversion is disabled.

Transform and Conver records

  1. For S3 bucket, choose Browse and choose the S3 bucket you created as a prerequisite to store the flow logs.
  2. Optionally, you can specify the S3 bucket prefix. The following expression creates a Hive-style partition for year, month, and day:

AWSLogs/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/

  1. Optionally, you can enable dynamic partitioning.

Dynamic partitioning enables you to create targeted datasets by partitioning streaming S3 data based on partitioning keys. The right partitioning can help you to save costs related to the amount of data that is scanned by analytics services like Athena. For more information, see Kinesis Data Firehose now supports dynamic partitioning to Amazon S3.

Note that you can enable dynamic partitioning only when you create a new delivery stream. You can’t enable dynamic partitioning for an existing delivery stream.

Destination Settings

  1. Expand Buffer hints, compression and encryption.
  2. Set the buffer size to 128 and buffer interval to 900 for best performance.
  3. For Compression for data records, select GZIP.

S3 Buffer settings

Create a VPC flow log subscription

Now you create a VPC flow log subscription for the Kinesis Data Firehose delivery stream you created.

Navigate to AWS CloudShell or Terminal/PowerShell for a Mac or Windows computer and run the following AWS CLI command to enable the subscription. Provide your VPC ID for the parameter --resource-ids and delivery stream ARN for the parameter --log-destination.

aws ec2 create-flow-logs \ 
--resource-type VPC \ 
--resource-ids vpc-0000012345f123400d \ 
--traffic-type ALL \ 
--log-destination-type kinesis-data-firehose \ 
--log-destination arn:aws:firehose:us-east-1:123456789101:deliverystream/PUT-Kinesis-Demo-Stream \ 
--max-aggregation-interval 60 \ 
--log-format '${account-id} ${action} ${az-id} ${bytes} ${dstaddr} ${dstport} ${end} ${flow-direction} ${instance-id} ${interface-id} ${log-status} ${packets} ${pkt-dst-aws-service} ${pkt-dstaddr} ${pkt-src-aws-service} ${pkt-srcaddr} ${protocol} ${region} ${srcaddr} ${srcport} ${start} ${sublocation-id} ${sublocation-type} ${subnet-id} ${tcp-flags} ${traffic-path} ${type} ${version} ${vpc-id}'

If you’re running CloudShell for the first time, it will take a few seconds to prepare the environment to run.

After you successfully enable the subscription for your VPC flow logs, it takes a few minutes depending on the intervals mentioned in the setup to create the log record files in the destination S3 folder.

To view those files, navigate to the Amazon S3 console and choose the bucket storing the flow logs. You should see the compressed interval logs, as shown in the following screenshot.

S3 destination bucket

You can download any file from the destination S3 bucket on your computer. Then extract the gzip file and view it in your favorite text editor.

The following is a sample enriched flow log record, with the new fields in bold providing added context and metadata of the source and destination IP addresses:

{'account-id': '123456789101',
 'action': 'ACCEPT',
 'az-id': 'use1-az2',
 'bytes': '7251',
 'dstaddr': '10.10.10.10',
 'dstport': '52942',
 'end': '1661285182',
 'flow-direction': 'ingress',
 'instance-id': 'i-123456789',
 'interface-id': 'eni-0123a456b789d',
 'log-status': 'OK',
 'packets': '25',
 'pkt-dst-aws-service': '-',
 'pkt-dstaddr': '10.10.10.11',
 'pkt-src-aws-service': 'AMAZON',
 'pkt-srcaddr': '52.52.52.152',
 'protocol': '6',
 'region': 'us-east-1',
 'srcaddr': '52.52.52.152',
 'srcport': '443',
 'start': '1661285124',
 'sublocation-id': '-',
 'sublocation-type': '-',
 'subnet-id': 'subnet-01eb23eb4fe5c6bd7',
 'tcp-flags': '19',
 'traffic-path': '-',
 'type': 'IPv4',
 'version': '5',
 'vpc-id': 'vpc-0123a456b789d',
 'src-tag-Name': 'test-traffic-ec2-1', 'src-tag-project': ‘Log Analytics’, 'src-tag-team': 'Engineering', 'dst-tag-Name': 'test-traffic-ec2-1', 'dst-tag-project': ‘Log Analytics’, 'dst-tag-team': 'Engineering', 'dst-vpc-id': 'vpc-0bf974690f763100d', 'dst-az-id': 'us-east-1a', 'dst-subnet-id': 'subnet-01eb23eb4fe5c6bd7', 'dst-interface-id': 'eni-01eb23eb4fe5c6bd7', 'dst-instance-id': 'i-06be6f86af0353293'}

Create an Athena database and AWS Glue crawler

Now that you have enriched the VPC flow logs and stored them in Amazon S3, the next step is to create the Athena database and table to query the data. You first create an AWS Glue crawler to infer the schema from the log files in Amazon S3.

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.

Glue Crawler

  1. For Name¸ enter a name for the crawler.
  2. For Description, enter an optional description.
  3. Choose Next.

Glue Crawler properties

  1. Choose Add a data source.
  2. For Data source¸ choose S3.
  3. For S3 path, provide the path of the flow logs bucket.
  4. Select Crawl all sub-folders.
  5. Choose Add an S3 data source.

Add Data source

  1. Choose Next.

Data source classifiers

  1. Choose Create new IAM role.
  2. Enter a role name.
  3. Choose Next.

Configure security settings

  1. Choose Add database.
  2. For Name, enter a database name.
  3. For Description, enter an optional description.
  4. Choose Create database.

Create Database

  1. On the previous tab for the AWS Glue crawler setup, for Target database, choose the newly created database.
  2. Choose Next.

Set output and scheduling

  1. Review the configuration and choose Create crawler.

Create crawler

  1. On the Crawlers page, select the crawler you created and choose Run.

Run crawler

You can rerun this crawler when new tags are added to your AWS resources, so that they’re available for you to query from the Athena database.

Run Athena queries

Now you’re ready to query the enriched VPC flow logs from Athena.

  1. On the Athena console, open the query editor.
  2. For Database, choose the database you created.
  3. Enter the query as shown in the following screenshot and choose Run.

Athena query

The following code shows some of the sample queries you can run:

Select * from awslogs where "dst-az-id"='us-east-1a'
Select * from awslogs where "src-tag-project"='Log Analytics' or "dst-tag-team"='Engineering' 
Select "srcaddr", "srcport", "dstaddr", "dstport", "region", "az-id", "dst-az-id", "flow-direction" from awslogs where "az-id"='use1-az2' and "dst-az-id"='us-east-1a'

The following screenshot shows an example query result of the source Availability Zone to the destination Availability Zone traffic.

Athena query result

You can also visualize various charts for the flow logs stored in the S3 bucket via QuickSight. For more information, refer to Analyzing VPC Flow Logs using Amazon Athena, and Amazon QuickSight.

Pricing

For pricing details, refer to Amazon Kinesis Data Firehose pricing.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the Kinesis Data Firehose delivery stream and associated IAM role and policies.
  2. Delete the target S3 bucket.
  3. Delete the VPC flow log subscription.
  4. Delete the Lambda function and associated IAM role and policy.

Conclusion

This post provided a complete serverless solution architecture for enriching VPC flow log records with additional information like resource tags using a Kinesis Data Firehose delivery stream and Lambda function to process logs to enrich with metadata and store in a target S3 file. This solution can help you query, analyze, and visualize VPC flow logs with relevant application metadata because resource tags have been assigned to resources that are available in the logs. This meaningful information associated with each log record wherever the tags are available makes it easy to associate log information to your application.

We encourage you to follow the steps provided in this post to create a delivery stream, integrate with your VPC flow logs, and create a Lambda function to enrich the flow log records with additional metadata to more easily understand and analyze traffic patterns in your environment.


About the Authors

Chaitanya Shah is a Sr. Technical Account Manager with AWS, based out of New York. He has over 22 years of experience working with enterprise customers. He loves to code and actively contributes to AWS solutions labs to help customers solve complex problems. He provides guidance to AWS customers on best practices for their AWS Cloud migrations. He is also specialized in AWS data transfer and in the data and analytics domain.

Vaibhav Katkade is a Senior Product Manager in the Amazon VPC team. He is interested in areas of network security and cloud networking operations. Outside of work, he enjoys cooking and the outdoors.

How a blockchain startup built a prototype solution to solve the need of analytics for decentralized applications with AWS Data Lab

Post Syndicated from Dr. Quan Hoang Nguyen original https://aws.amazon.com/blogs/big-data/how-a-blockchain-startup-built-a-prototype-solution-to-solve-the-need-of-analytics-for-decentralized-applications-with-aws-data-lab/

This post is co-written with Dr. Quan Hoang Nguyen, CTO at Fantom Foundation.

Here at Fantom Foundation (Fantom), we have developed a high performance, highly scalable, and secure smart contract platform. It’s designed to overcome limitations of the previous generation of blockchain platforms. The Fantom platform is permissionless, decentralized, and open source. The majority of decentralized applications (dApps) hosted on the Fantom platform lack an analytics page that provides information to the users. Therefore, we would like to build a data platform that supports a web interface that will be made public. This will allow users to search for a smart contract address. The application then displays key metrics for that smart contract. Such an analytics platform can give insights and trends for applications deployed on the platform to the users, while the developers can continue to focus on improving their dApps.

AWS Data Lab offers accelerated, joint-engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives. Data Lab has three offerings: the Build Lab, the Design Lab, and a Resident Architect. The Build Lab is a 2–5 day intensive build with a technical customer team. The Design Lab is a half-day to 2-day engagement for customers who need a real-world architecture recommendation based on AWS expertise, but aren’t yet ready to build. Both engagements are hosted either online or at an in-person AWS Data Lab hub. The Resident Architect provides AWS customers with technical and strategic guidance in refining, implementing, and accelerating their data strategy and solutions over a 6-month engagement.

In this post, we share the experience of our engagement with AWS Data Lab to accelerate the initiative of developing a data pipeline from an idea to a solution. Over 4 weeks, we conducted technical design sessions, reviewed architecture options, and built the proof of concept data pipeline.

Use case review

The process started with us engaging with our AWS Account team to submit a nomination for the data lab. This followed by a call with the AWS Data Lab team to assess the suitability of requirements against the program. After the Build Lab was scheduled, an AWS Data Lab Architect engaged with us to conduct a series of pre-lab calls to finalize the scope, architecture, goals, and success criteria for the lab. The scope was to design a data pipeline that would ingest and store historical and real-time on-chain transactions data, and build a data pipeline to generate key metrics. Once ingested, data should be transformed, stored, and exposed via REST-based APIs and consumed by a web UI to display key metrics. For this Build Lab, we choose to ingest data for Spooky, which is a decentralized exchange (DEX) deployed on the Fantom platform and had the largest Total Value Locked (TVL) at that time. Key metrics such number of wallets that have interacted with the dApp over time, number of tokens and their value exchanged for the dApp over time, and number of transactions for the dApp over time were selected to visualize through a web-based UI.

We explored several architecture options and picked one for the lab that aligned closely with our end goal. The total historical data for the selected smart contract was approximately 1 GB since deployment of dApp on the Fantom platform. We used FTMScan, which allows us to explore and search on the Fantom platform for transactions, to estimate the rate of transfer transactions to be approximately three to four per minute. This allowed us to design an architecture for the lab that can handle this data ingestion rate. We agreed to use an existing application known as the data producer that was developed internally by the Fantom team to ingest on-chain transactions in real time. On checking transactions’ payload size, it was found to not exceed 100 kb for each transaction, which gave us the measure of number of files that will be created once ingested through the data producer application. A decision was made to ingest the past 45 days of historic transactions to populate the platform with enough data to visualize key metrics. Because the feature of backdating exists within the data producer application, we agreed to use that. The Data Lab Architect also advised us to consider using AWS Database Migration Service (AWS DMS) to ingest historic transactions data post lab. As a last step, we decided to build a React-based webpage with Material-UI that allows users to enter a smart contract address and choose the time interval, and the app fetches the necessary data to show the metrics value.

Solution overview

We collectively agreed to incorporate the following design principles for the data lab architecture:

  • Simplified data pipelines
  • Decentralized data architecture
  • Minimize latency as much as possible

The following diagram illustrates the architecture that we built in the lab.

We collectively defined the following success criteria for the Build Lab:

  • End-to-end data streaming pipeline to ingest on-chain transactions
  • Historical data ingestion of the selected smart contract
  • Data storage and processing of on-chain transactions
  • REST-based APIs to provide time-based metrics for the three defined use cases
  • A sample web UI to display aggregated metrics for the smart contract

Prior to the Build Lab

As a prerequisite for the lab, we configured the data producer application to use the AWS Software Development Kit (AWS SDK) and PUTRecords API operation to send transactions data to an Amazon Simple Storage Service (Amazon S3) bucket. For the Build Lab, we built additional logic within the application to ingest historic transactions data together with real-time transactions data. As a last step, we verified that transactions data was captured and ingested into a test S3 bucket.

AWS services used in the lab

We used the following AWS services as part of the lab:

  • AWS Identity and Access Management (IAM) – We created multiple IAM roles with appropriate trust relationships and necessary permissions that can be used by multiple services to read and write on-chain transactions data and generated logs.
  • Amazon S3 – We created an S3 bucket to store the incoming transactions data as JSON-based files. We created a separate S3 bucket to store incoming transaction data that failed to be transformed and will be reprocessed later.
  • Amazon Kinesis Data Streams – We created a new Kinesis data stream in on-demand mode, which automatically scales based on data ingestion patterns and provides hands-free capacity management. This stream was used by the data producer application to ingest historical and real-time on-chain transactions. We discussed having the ability to manage and predict cost, and therefore were advised to use the provisioned mode when reliable estimates were available for throughput requirements. We were also advised to continue to use on-demand mode until the data traffic patterns were unpredictable.
  • Amazon Kinesis Data Firehose – We created a Firehose delivery stream to transform the incoming data and writes it to the S3 bucket. To minimize latency, we set the delivery stream buffer size to 1 MiB and buffer interval to 60 seconds. This would ensure a file is written to the S3 bucket when either of the two conditions are satisfied regardless of the order. Transactions data written to the S3 bucket was in JSON Lines format.
  • Amazon Simple Queue Service (Amazon SQS) – We set up an SQS queue of the type Standard and an access policy for that SQS queue to allow incoming messages generated from S3 bucket event notifications.
  • Amazon DynamoDB – In order to pick a data store for on-chain transactions, we needed a service that can store transactions payload of unstructured data with varying schemas, provides the ability to cache query results, and is a managed service. We picked DynamoDB for those reasons. We created a single DynamoDB table that holds the incoming transactions data. After analyzing the access query patterns, we decided to use the address field of the smart contract as the partition key and the timestamp field as the sort key. The table was created with auto scaling of read and write capacity modes because the actual usage requirements would be hard to predict at that time.
  • AWS Lambda – We created the following functions:
    • A Python-based Lambda function to perform transformations on the incoming data from the data producer application to flatten the JSON structure, convert the Unix-based epoch timestamp to a date/time value, and convert hex-based string values to a decimal value representing the number of tokens.
    • A second Lambda function to parse incoming SQS queue messages. This message contained values for bucket_name and object_key, which holds the reference to a newly created object within the S3 bucket. The Lambda function logic included parsing of this value to obtain the reference to the S3 object, get the contents of the object, read it into a data frame object using the AWS SDK for pandas (awswrangler) library, convert it into a Pandas data frame object, and use the put_df API call to write a Pandas data frame object as an item into a DynamoDB table. We choose to use Pandas due to familiarity with the library and functions required to perform data transform operations.
    • Three separate Lambda functions that contains the logic to query the DynamoDB table and retrieve items to aggregate and calculate metrics values. This calculated metrics value within the Lambda function was formatted as an HTTP response to expose as REST-based APIs.
  • Amazon API Gateway – We created a REST based API endpoint that uses Lambda proxy integration to pass a smart contract address and time-based interval in minutes as a query string parameter to the backend Lambda function. The response from the Lambda function was a metrics value. We also enabled cross-origin resource sharing (CORS) support within API Gateway to successfully query from the web UI that resides in a different domain.
  • Amazon CloudWatch – We used a Lambda function in-built mechanism to send function metrics to CloudWatch. Lambda functions come with a CloudWatch Logs log group and a log stream for each instance of your function. The Lambda runtime environment sends details of each invocation to the log stream, and relays logs and other output from your function’s code.

Iterative development approach

Across 4 days of the Build Lab, we undertook iterative development. We started by developing the foundational layer and iteratively added extra features through testing and data validation. This allowed us to develop confidence of the solution being built as we tested the output of the metrics through a web-based UI and verified with the actual data. As errors got discovered, we deleted the entire dataset and reran all the jobs to verify results and resolve those errors.

Lab outcomes

In 4 days, we built an end-to-end streaming pipeline ingesting 45 days of historical data and real-time on-chain transactions data for the selected Spooky smart contract. We also developed three REST-based APIs for the selected metrics and a sample web UI that allows users to insert a smart contract address, choose a time frequency, and visualize the metrics values. In a follow-up call, our AWS Data Lab Architect shared post-lab guidance around the next steps required to productionize the solution:

  • Scaling of the proof of concept to handle larger data volumes
  • Security best practices to protect the data while at rest and in transit
  • Best practices for data modeling and storage
  • Building an automated resilience technique to handle failed processing of the transactions data
  • Incorporating high availability and disaster recovery solutions to handle incoming data requests, including adding of the caching layer

Conclusion

Through a short engagement and small team, we accelerated this project from an idea to a solution. This experience gave us the opportunity to explore AWS services and their analytical capabilities in-depth. As a next step, we will continue to take advantage of AWS teams to enhance the solution built during this lab to make it ready for the production deployment.

Learn more about how the AWS Data Lab can help your data and analytics on the cloud journey.


About the Authors

Dr. Quan Hoang Nguyen is currently a CTO at Fantom Foundation. His interests include DLT, blockchain technologies, visual analytics, compiler optimization, and transactional memory. He has experience in R&D at the University of Sydney, IBM, Capital Markets CRC, Smarts – NASDAQ, and National ICT Australia (NICTA).

Ankit Patira is a Data Lab Architect at AWS based in Melbourne, Australia.

Ingest VPC flow logs into Splunk using Amazon Kinesis Data Firehose

Post Syndicated from Ranjit Kalidasan original https://aws.amazon.com/blogs/big-data/ingest-vpc-flow-logs-into-splunk-using-amazon-kinesis-data-firehose/

In September 2017, during the annual Splunk.conf, Splunk and AWS jointly announced Amazon Kinesis Data Firehose integration to support Splunk Enterprise and Splunk Cloud as a delivery destination. This native integration between Splunk Enterprise, Splunk Cloud, and Kinesis Data Firehose is designed to make AWS data ingestion setup seamless, while offering a secure and fault-tolerant delivery mechanism. We want to enable you to monitor and analyze machine data from any source and use it to deliver operational intelligence and optimize IT, security, and business performance.

With Kinesis Data Firehose, you can use a fully managed, reliable, and scalable data streaming solution to Splunk. In September 2022, AWS announced a new Amazon Virtual Private Cloud (Amazon VPC) feature that enables you to create VPC flow logs to send the flow log data directly into Kinesis Data Firehose as a destination. Previously, you could send VPC flow logs to either Amazon CloudWatch Logs or Amazon Simple Storage Service (Amazon S3) before it was ingested by other AWS or Partner tools. In this post, we show you how to use this feature to set up VPC flow logs for ingesting into Splunk using Kinesis Data Firehose.

Overview of solution

We deploy the following architecture to ingest data into Splunk.

We create a VPC flow log in an existing VPC to send the flow log data to a Kinesis Data Firehose delivery stream. This delivery stream has an AWS Lambda function enabled for data transformation and has destination settings to point to the Splunk endpoint along with an HTTP Event Collector (HEC) token.

Prerequisites

Before you begin, ensure that you have the following prerequisites:

  • AWS account – If you don’t have an AWS account, you can create one. For more information, see Setting Up for Amazon Kinesis Data Firehose.
  • Splunk AWS Add-on – Ensure you install the Splunk AWS Add-on app from Splunkbase in your Splunk deployment. This app provides the required source types and event types mapping to AWS machine data.
  • HEC token – In your Splunk deployment, set up an HEC token with the source type aws:cloudwatchlogs:vpcflow.

Create the transformation Lambda function

Integrating VPC flow logs with Kinesis Data Firehose requires a Lambda function to transform the flow log records. The data that VPC flow logs sends to the delivery stream is encoded as JSON records. However, Splunk expects this as raw flow log data. Therefore, when you create the delivery stream, you enable data transformation and configure a Lambda function to transform the flow log data to raw format. Kinesis Data Firehose then sends the data in raw format to Splunk.

You can deploy this transformation Lambda function as a serverless application from the Lambda serverless app repository on the Lambda console. The name of this application is splunk-firehose-flowlogs-processor.

After it’s deployed, you can see a Lambda function and an AWS Identity and Access Management (IAM) role getting deployed on the console. Note the physical ID of the Lambda function; you use this when you create the Firehose delivery stream in the next step.

Create a Kinesis Data Firehose delivery stream

In this step, you create a Kinesis Data Firehose delivery stream to receive the VPC flow log data and deliver that data to Splunk.

  1. On the Kinesis Data Firehose console, create a new delivery stream.
  2. For Source, choose Direct PUT.
  3. For Destination, choose Splunk.
  4. For Delivery stream name, enter a name (for example, VPCtoSplunkStream).
  5. In the Transform records section, for Data transformation, select Enabled.
  6. For AWS Lambda function, choose Browse.
  7. Select the function you created earlier by looking for the physical ID.
  8. Choose Choose.
  9. In the Destination settings section, for Splunk cluster endpoint, enter your endpoint.If you’re using a Splunk Cloud endpoint, refer to Configure Amazon Kinesis Firehose to send data to the Splunk platform for different Splunk cluster endpoint values.
  10. For Splunk endpoint type, select Raw endpoint.
  11. For Authentication token, enter the value of your Splunk HEC that you created as a prerequisite.
  12. In the Backup settings section, for Source record backup in Amazon S3, select Failed events only so you only save the data that fails to be ingested into Splunk.
  13. For S3 backup bucket, enter the path to an S3 bucket.
  14. Complete creating your delivery stream.

The creation process may take a few minutes to complete.

Create a VPC flow log

In this final step, you create a VPC flow log with Kinesis Data Firehose as destination type.

  1. On the Amazon VPC console, choose Your VPCs.
  2. Select the VPC for which to create the flow log.
  3. On the Actions menu, choose Create flow log.
  4. Provide the required settings for Filter:
    1. If you want to filter the flow logs, select Accept traffic or Reject traffic.
    2. Select All if you need all the information sent to Splunk.
  5. For Maximum aggregation interval, select a suitable interval for your use case.Select the minimum setting of 1 minute interval if you need the flow log data to be available for near-real-time analysis in Splunk.
  6. For Destination, select Send to Kinesis Firehose in the same account if the delivery stream is set up on the same account where you create the VPC flow logs.If you want to send the data to a different account, refer to Publish flow logs to Kinesis Data Firehose.
  7. For Log record format, if you leave it at AWS default format, the flow logs are sent as version 2 format. Alternatively, you can specify which fields you need to be captured and sent to Splunk.For more information on log format and available fields, refer to Flow log records.
  8. Review all the parameters and create the flow log.Within a few minutes, you should be able to see the data in Splunk.
  9. Open your Splunk console and navigate to the Search tab of the Search & Reporting app.
  10. Run the following SPL query to look at sample VPC flow log records:
    index=<index name> sourcetype="aws:cloudwatchlogs:vpcflow"

Clean up

To avoid incurring future charges, delete the resources you created in the following order:

  1. Delete the VPC flow log.
  2. Delete the Kinesis Data Firehose delivery stream.
  3. Delete the serverless application to delete the transformation Lambda function.
  4. If you created a new VPC and new resources in the VPC, then delete the resources and VPC.

Conclusion

You can use VPC flow log data in multiple Splunk solutions, like the Splunk App for AWS Security Dashboards for traffic analysis or Splunk Security Essentials, which uses the data to provide deeper insights into the security posture of your AWS environment. Using Kinesis Data Firehose to send VPC flow log data into Splunk provides many benefits. This managed service can automatically scale to meet the data demand and provide near-real-time data analysis. Try out this new quick and hassle-free way of sending your VPC flow logs to Splunk Enterprise or Splunk Cloud Platform using Kinesis Data Firehose.

You can deploy this solution today on your AWS account by following the Kinesis Data Firehose Immersion Day Lab for Splunk


About the authors

Ranjit Kalidasan is a Senior Solutions Architect with Amazon Web Services based in Boston, Massachusetts. He is Partner Solutions Architect helping security ISV partners to co-build and co-market solutions with AWS. He brings over 20 years of experience in Information technology helping global customers implement complex solutions for Security & Analytics. You can connect with Ranjit in Linkedin.