All posts by Mahesh Goyal

Streaming data from Amazon S3 to Amazon Kinesis Data Streams using AWS DMS

Post Syndicated from Mahesh Goyal original https://aws.amazon.com/blogs/big-data/streaming-data-from-amazon-s3-to-amazon-kinesis-data-streams-using-aws-dms/

Stream processing is very useful in use cases where we need to detect a problem quickly and improve the outcome based on data, for example production line monitoring or supply chain optimizations.

This blog post walks you through process of streaming existing data files and ongoing changes from Amazon Simple Storage Service (Amazon S3) to Amazon Kinesis. You achieve this by using AWS Database Migration Service (AWS DMS). AWS DMS enables you to seamlessly migrate data from supported sources to relational databases, data warehouses, streaming platforms, and other data stores in AWS cloud.

Many SaaS, third-party applications already integrate with Amazon S3 and can deliver records to S3 buckets. In certain use cases, you need to further process this data in near-real-time to generate alerts. Use cases like threat detection and application monitoring require generating insights in seconds. Waiting for batch processes often leads to a delay in data analysis and reduces the ability of systems to respond quickly to critical situations. For such use cases, you need a way to convert batch to stream processing by expanding the existing integrations of your applications with Amazon S3.

You can use AWS DMS for such data-processing requirements. AWS DMS lets to expand your existing application into Amazon S3 to produce data in Amazon Kinesis Data Streams for real-time analytics without writing and maintaining new code. AWS DMS supports specifying Amazon S3 as the source and streaming services like Kinesis and Amazon Managed Streaming of Kafka (Amazon MSK) as the target. AWS DMS allows migration of full and change data capture (CDC) files to these services. AWS DMS performs this task out of box without any complex configuration or code development. You can also configure an AWS DMS replication instance to scale up or down depending on the workload.

For this post, we focus on streaming data to Kinesis. We deploy an AWS CloudFormation template to get started in minutes and explore the streaming pipeline.

Architecture overview

Third-party applications such as web, API, and data-integration services produce data and log files in S3 buckets. Data lakes built on AWS process and store data in Amazon S3 at different stages. AWS DMS supports Amazon S3 as the source and Kinesis as the target, so data stored in an S3 bucket is streamed to Kinesis. Several consumers, such as AWS Lambda, Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, and the Kinesis Consumer Library (KCL), can consume the data concurrently to perform real-time analytics on the dataset. Each AWS service in this architecture can scale independently as needed.

The following diagram shows the architecture of this solution.

Deploying AWS CloudFormation

To get started, you first deploy the CloudFormation template to create the core components of the architecture. AWS CloudFormation automates the deployment of technology and infrastructure in a safe and repeatable manner across multiple Regions and accounts with the least amount of effort and time. To create these resources, complete the following steps:

  1. Sign in to the AWS Management Console and choose the us-west-2 Region.
  2. Choose Launch Stack:
  3. Choose Next.

 This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the CloudFormation template on the console.

  1. For Stack name, enter a stack name.
  2. On the next screen, choose your VPC and subnet IDs.
  3. For Does DMS VPC and Cloudwatch role Exists?, enter Y if the managed AWS Identity and Access Management (IAM) roles dms-vpc-role and dms-cloudwatch-logs-role exist in your account. Otherwise, leave at the default N.

If you want to deploy the AWS DMS endpoint in a private subnet, enable the VPC endpoints for Kinesis and Amazon S3 before deploying the template.

  1. Choose Next.
  2. Acknowledge resource creation under Capabilities on the final screen and choose Create.

The stack takes 5–10 minutes to complete, during which it performs the following:

The files required for this demo don’t come with the template. Download blog_sample_file.zip and upload it to the source bucket before starting the AWS DMS task.

Using Amazon S3 as the source

When you use Amazon S3 as the source, the data files (full load and CDC) must be in comma-separated value (CSV) format.

In addition to the data files, AWS DMS also requires an external table definition. An external table definition is a JSON document that describes how AWS DMS should interpret the data from Amazon S3.

Amazon S3 file paths for full load and CDC files are required for AWS DMS to run the task. Make sure that files names are sequentially numbered to replicate the data in the correct order. In addition, AWS DMS allows you to specify the column delimiter, row delimiter, and other parameters using extra connection attributes.

AWS DMS can identify the operation to perform for each load record in two ways: from the record’s keyword value INSERT or I.

For more information, see Using Amazon S3 as a source for AWS DMS.

Using Amazon Kinesis as the target

AWS publishes records to a Kinesis data stream as JSON. During conversion, AWS DMS serializes each record from the source Amazon S3 files into an attribute-value pair in JSON format.

AWS DMS publishes each record in the source Amazon S3 file as one JSON data record in a data stream regardless of the action specified in the source file.

Additionally, AWS DMS allows object mapping to migrate data from source files to a data stream. Object mapping determines the structure of data records in the stream.

AWS DMS also supports multi-threaded migration for full load and CDC with task settings. You can promote the performance by setting multiple threads, buffer size, and parallel queue.

For more information, see Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service.

Walkthrough

The AWS CloudFormation deployment takes care of all the infrastructure. Now you need files to complete this use case.

  1. Download blog_sample_file.zip, which contains full and CDC load files in CSV format.

If your source files aren’t in CSV, convert the file format to CSV. One conversion method is by using AWS Glue. For more information, see Format Options for ETL Inputs and Outputs in AWS Glue.

The following screenshot shows the sample records of the full load files that you use for this use case.

CDC files require additional attributes for AWS DMS to identify the action, table, and schema.

  1. Reformat the files as follows:
  • Operation – The change operation to be performed: INSERT or I, UPDATE or U, or DELETE or D.
  • Table name – The name of the source table.
  • Schema name – The name of the source schema.
  • Data – One or more columns that represent the data to be changed.

The following screenshot shows sample records of the CDC file.

External table definition is required in the source endpoint configuration. For this post, the definition is embedded in AWS CloudFormation.

  1. Enter the following code for the table definition for the full and CDC files:
    {
    	“TableCount”: “1",
    	“Tables”: [{
    		“TableName”: “table01”,
    		“TablePath”: “schema01/table01/“,
    		“TableOwner”: “schema01",
    		“TableColumns”: [{
    			“ColumnName”: “ingest_time”,
    			“ColumnType”: “TIMESTAMP”,
    			“ColumnNullable”: “false”,
    			“ColumnIsPk”: “true”
    		}, {
    			“ColumnName”: “doi”,
    			“ColumnType”: “STRING”,
    			“ColumnLength”: “30”
    		}, {
    			“ColumnName”: “id”,
    			“ColumnType”: “INT8”
    		}, {
    			“ColumnName”: “value”,
    			“ColumnType”: “NUMERIC”,
    			“ColumnPrecision”: “5”,
    			“ColumnScale”: “2”
    		}, {
    			“ColumnName”: “data_sig”,
    			“ColumnType”: “STRING”,
    			“ColumnLength”: “10”
    		}],
    		“TableColumnsTotal”: “5”
    	}]
    }
    

  2. Create folder structures under the source S3 bucket created through the CloudFormation template.
    1. Create folders schema01/table01/ for full load and cdcfile/ for CDC data files.
    2. Also, file names should be in incremental, as listed in the following CLI output.
      $aws s3 ls s3://blog-xxxxxxxx/schema01/table01 --recursive --human-readable --summarize
      2020-08-03 22:05:57    5.0 MiB schema01/table01/full_000
      2020-08-03 22:05:51    5.0 MiB schema01/table01/full_001
      2020-08-03 22:06:00    5.0 MiB schema01/table01/full_002
      2020-08-03 22:05:56    5.0 MiB schema01/table01/full_003
      2020-08-03 22:05:59    3.1 MiB schema01/table01/full_004
      
      $aws s3 ls s3://blog-xxxxxxxx/cdcfile --recursive --human-readable --summarize
      2020-08-03 22:06:28    4.8 MiB cdc/cdc_000
      2020-08-03 22:06:28    4.8 MiB cdc/cdc_001
      2020-08-03 22:06:26    4.8 MiB cdc/cdc_002
      2020-08-03 22:06:19    4.8 MiB cdc/cdc_003
      

  3. After the files are copied, on the AWS DMS console, choose Replication.
  4. Validate the instance status and configuration.
  5. Choose Endpoints.
  6. Validate the status and configuration of the Amazon S3 source endpoint and make sure that the connection to the replication instance is successful.
  7. Similarly, validate the status and configuration of Kinesis target endpoint and make sure that the connection to the replication instance is successful.
  8. Choose Database migration task.
  9. Verify that the source and target are mapped correctly.
  10. After validating all the configurations, restart the AWS DMS task. Because the task has been created and never started, choose Restart/Resume to start full load and CDC.

After data migration starts, you can see it listed under Table statistics. For more information, see How do I use table statistics to monitor an AWS DMS task?

AWS DMS completes the full load first and migrates change data as files are uploaded to the bucket location specified in the cdcPath parameter.

  1. While the migration is in progress, on the Kinesis console, check the IncomingBytes metrics on the Monitoring tab to confirm the data is streaming to Kinesis Data Streams.
  2. To confirm that the data streamed is being consumed by the Lambda consumer, use the GetRecords.Bytes metric.

You’re now ready to validate the records in Lambda. Lambda is configured to read from Kinesis through a trigger.

The Lambda consumer for this post is a sample function that consumes the records from the Kinesis data stream, decodes the base64 encoded data, and prints the records to the Amazon CloudWatch log group.

  1. On the Monitoring tab, open the recent logstream under CloudWatch Log Insights to see the printed records.

For more information about monitoring, see Monitoring functions in the AWS Lambda console.

You can add processing logic to the Lambda function as per your requirements to aggregate or process the records. You can also configure a Lambda destination for further processing. Lambda asynchronous invocations can put an event or message on Amazon Simple Notification Service (Amazon SNS), Amazon Simple Queue Service (Amazon SQS), or Amazon EventBridge. For more information, see Introducing AWS Lambda Destinations.

Best practice considerations

When implementing this solution, consider the following best practices:

  • Full load allows to you stream existing data from an S3 bucket to Kinesis. You can use full load to migrate previously stored data before streaming CDC data. The full load data should already exist before the task starts. For new CDC files, the data is streamed to Kinesis on a file delivery event in real-time.
  • For loading multiple tables, you can specify the table count and table properties in an external table definition file. The CDC path remains the same and AWS DMS maps the records to tables based on the metadata fields.
  • During a heavy workload, the AWS DMS instance can be constrained to resources like CPU, memory, storage, and I/O. For optimal transfer speed, monitor the CloudWatch metrics and scale the replication instance.
  • For migrating a large number of tables, you can speed up the transfer by setting the multi-threading parameter to higher values.
  • The CloudFormation template creates a data stream with two shards. As the data flow rate to the stream increases, you can scale the number of shards in the stream to adapt to changes. Monitoring Kinesis with CloudWatch metrics for IncomingRecords and WriteProvisionedThroughputExceeded provides insights on how to scale the shards.
  • Object mapping in the AWS DMS task defines the partition key. This partition key is used to group data by shard within a stream. The default partition key AWS DMS uses is TableName. You can use attribute mapping to change the partition key to a value of one of the fields in the JSON, or the primary key of the table in the source database. You can also set the partition key to a constant value to stream all the data to a single shard in the stream.
  • By default, Lambda invokes the function as soon as records are available in the stream. To avoid invoking the function with a small number of records, configure the event source to buffer records for up to 5 minutes by configuring a batch window. For more information, see Using AWS Lambda with Amazon Kinesis.
  • When Kinesis is configured as a trigger for Lambda, you can increase the concurrency to process multiple batches from each shard in parallel. Lambda can process up to 10 batches in each shard simultaneously. For more information about concurrency, see New AWS Lambda scaling controls for Kinesis and DynamoDB event sources.

Cleaning up

After successful testing and validation, you should delete all the resources deployed through the CloudFormation template to avoid any unwanted costs. First empty the S3 bucket and stop the AWS DMS task. Then delete the appropriate stacks on the AWS CloudFormation console.

Summary

This post describes a solution for converting batch processing to near real-time using AWS DMS. This solution greatly simplifies the process of migrating records from Amazon S3 to Kinesis for analysis. Kinesis as an AWS DMS target allows multiple systems to consume data simultaneously. Having a near-steaming pipeline allows you to make sense of all the changes in near-real time, which ultimately expands your organization’s ability for better decision-making. All the resources used in this solution scale seamlessly and allow you to focus on analysis, alerting, reporting, and fraud detection instead of focusing on platform setup and maintenance. This promotes cost-effectiveness while reducing operational burden.


About the Author

Mahesh Goyal is a Data Architect in Big Data at AWS. He works with customers in their journey to the cloud with a focus on big data and data warehouses. In his spare time, Mahesh likes to listen to music and explore new food places with his family.

 

 

 

 

Charishma Makineni is a Technical Account Manager at AWS. She works with enterprise customers to help them build secure and scalable solutions on the AWS cloud. She is focused on Big data and Analytics technologies. Outside of work, Charishma enjoys being outdoors, gardening and experimenting with cooking.

 

 

 

Suresh Patnam is a Solutions Architect at AWS. He helps customers innovate on the AWS platform by building highly available, scalable, and secure architectures on Big Data and AI/ML. In his spare time, Suresh enjoys playing tennis and spending time with his family.

Analyzing Amazon S3 server access logs using Amazon ES

Post Syndicated from Mahesh Goyal original https://aws.amazon.com/blogs/big-data/analyzing-amazon-s3-server-access-logs-using-amazon-es/

When you use Amazon Simple Storage Service (Amazon S3) to store corporate data and host websites, you need additional logging to monitor access to your data and the performance of your application. An effective logging solution enhances security and improves the detection of security incidents. With the advent of increased data storage needs, you can rely on Amazon S3 for a range of use cases and simultaneously looking for ways to analyze your logs to ensure compliance, perform the audit, and discover risks.

Amazon S3 lets you monitor the traffic using the server access logging feature. With server access logging, you can capture and monitor the traffic to your S3 bucket at any time, with detailed information about the source of the request. The logs are stored in the S3 bucket you own in the same Region. This addresses the security and compliance requirements of most organizations. The logs are critical for establishing baselines, analyzing access patterns, and identifying trends. For example, the logs could answer a financial organization’s question about how many requests are made to a bucket and who is making what type of access requests to the objects.

You can discover insights from server access logs through several different methods. One common option is by using Amazon Athena or Amazon Redshift Spectrum and query the log files stored in Amazon S3. However, this solution poses high latency with an exponential growth in volume. It requires further integration with Amazon QuickSight to add visualization capabilities.

You can address this by using Amazon Elasticsearch Service (Amazon ES). Amazon ES is a managed service that makes it easier to deploy, operate, and scale Elasticsearch clusters in the AWS Cloud. Elasticsearch is a popular open-source search and analytics engine for use cases such as log analytics, real-time application monitoring, and clickstream analysis. The service provides support for open-source Elasticsearch APIs, managed Kibana, and integration with other AWS services such as Amazon S3 and Amazon Kinesis for loading streaming data into Amazon ES.

This post walks you through automating ingestion of server access logs from Amazon S3 into Amazon ES using AWS Lambda and visualizing the data in Kibana.

Architecture overview

Server access logging is enabled on source buckets, and logs are delivered to access log bucket. The access log bucket is configured to send an event to the Lambda function when a log file is created. On an event trigger, the Lambda function reads the file, processes the access log, and sends it to Amazon ES. When the logs are available, you can use Kibana to create interactive visuals and analyze the logs over a time period.

When designing a log analytics solution for high-frequency incoming data, you should consider buffering layers to avoid instability in the system. Buffering helps you streamline processes for unpredictable incoming log data. For such use cases, you can take advantage of managed services like Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Streaming services buffer data before delivering it to Amazon ES. This helps you avoid overwhelming your cluster with spiky ingestion events. Kinesis Data Firehose can reliably load data into Amazon ES. Kinesis Data Firehose lets you choose a buffer size of 1–100 MiBs and a buffer interval of 60–900 seconds when Amazon ES is selected as the destination. Kinesis Data Firehose also scales automatically to match the throughput of your data and requires no ongoing administration. For more information, see Ingest streaming data into Amazon Elasticsearch Service within the privacy of your VPC with Amazon Kinesis Data Firehose.

The following diagram illustrates the solution architecture.

Prerequisites

Before creating resources in AWS CloudFormation, you must enable server access logging on the source bucket. Open the S3 bucket properties and look for Amazon S3 access and delivery bucket. See the following screenshot.

You also need an AWS Identity and Access Management (IAM) user with sufficient permissions to interact with the AWS Management Console and related AWS services. The user must have access to create IAM roles and policies via the CloudFormation template.

Setting up the resources with AWS CloudFormation

First, deploy the CloudFormation template to create the core components of the architecture. AWS CloudFormation automates the deployment of technology and infrastructure in a safe and repeatable manner across multiple Regions and multiple accounts with the least amount of effort and time.

  1. Sign in to the console and choose the Region of the bucket storing the access log. For this post, I use us-east-1.
  2. Launch the stack:
  3. Choose Next.
  4. For Stack name, enter a name.
  5. On the Parameters page, enter the following parameters:
    1. VPC Configuration – Select any VPC that has at least two private subnets. The template deploys the Amazon ES service domain and Lambda within the VPC.
    2. Private subnets – Select two private subnets of the VPC. The route tables associated with subnets must have a NAT gateway configuration and VPC endpoint for Amazon S3 to privately connect the bucket from Lambda.
    3. Access log S3 bucket – Enter the S3 bucket where access logs are delivered. The template configures event notification on the bucket to trigger the Lambda function.
    4. Amazon ES domain name – Specify the Amazon ES domain name to be deployed through the template.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Acknowledge resource creation under Capabilities and transforms and choose Create.

The stack takes about 10–15 minutes to complete. The CloudFormation stack does the following:

  • Creates an Amazon ES domain with fine-grained access control enabled on it. Fine-grained access control is configured with a primary user in the internal user database.
  • Creates IAM role for the Lambda function with required permission to read from S3 bucket and write to Amazon ES.
  • Creates Lambda within the same VPC of Amazon ES elastic network interfaces (ENI). Amazon ES places an ENI in the VPC for each of your data nodes. The communication from Lambda to the Amazon ES domain is via this ENI.
  • Configures file create event notification on Access log S3 bucket to trigger the Lambda function. The function code segments are discussed in detail in this GitHub project.

You must make several considerations before you proceed with a production-grade deployment. For this post, I use one primary shard with no replicas. As a best practice, we recommend deploying your domain into three Availability Zones with at least two replicas. This configuration lets Amazon ES distribute replica shards to different Availability Zones than their corresponding primary shards and improves the availability of your domain. For more information about sizing your Amazon ES, see Get started with Amazon Elasticsearch Service: T-shirt-size your domain.

We recommend setting the shard count based on your estimated index size, using 50 GB as a maximum target shard size. You should also define an index template to set the primary and replica shard counts before index creation. For more information about best practices, see Best practices for configuring your Amazon Elasticsearch Service domain.

For high-frequency incoming data, you can rotate indexes either per day or per week depending on the size of data being generated. You can use Index State Management to define custom management policies to automate routine tasks and apply them to indexes and index patterns.

Creating the Kibana user

With Amazon ES, you can configure fine-grained users to control access to your data. Fine-grained access control adds multiple capabilities to give you tighter control over your data. This feature includes the ability to use roles to define granular permissions for indexes, documents, or fields and to extend Kibana with read-only views and secure multi-tenant support. For more information on granular access control, see Fine-Grained Access Control in Amazon Elasticsearch Service.

For this post, you create a fine-grained role for Kibana access and map it to a user.

  1. Navigate to Kibana and enter the primary user credentials:
    1. User nameadminuser01
    2. Password[email protected]

To access Kibana, you must have access to the VPC. For more information about accessing Kibana, see Controlling Access to Kibana.

  1. Choose Security, Roles.
  2. For Role name, enter kibana_only_role.
  3. For Cluster-wide permissions, choose cluster_composite_ops_ro.
  4. For Index patterns, enter access-log and kibana.
  5. For Permissions: Action Groups, choose read, delete, index, and manage.
  6. Choose Save Role Definition.
  7. Choose Security, Internal User Database, and Create a New User.
  8. For Open Distro Security Roles, choose Kibana_only_role (created earlier).
  9. Choose Submit.

The user kibanauser01 now has full access to Kibana and access-logs indexes. You can log in to Kibana with this user and create the visuals and dashboards.

Building dashboards

You can use Kibana to build interactive visuals and analyze the trends and combine the visuals for different use cases in a dashboard. For example, you may want to see the number of requests made to the buckets in the last two days.

  1. Log in to Kibana using kibanauser01.
  2. Create an index pattern and set the time range
  3. On the Visualize section of your Kibana dashboard, add a new visualization.
  4. Choose Vertical Bar.

You can select any time range and visual based on your requirements.

  1. Choose the index pattern and then configure your graph options.
  2. In the Metrics pane, expand Y-Axis.
  3. For Aggregation, choose Count.
  4. For Custom Label, enter Request Count.
  5. Expand the X-Axis
  6. For Aggregation, choose Terms.
  7. For Field, choose bucket.
  8. For Order By, choose metric: Request Count.
  9. Choose Apply changes.
  10. Choose Add sub-bucket and expand the Split Series
  11. For Sub Aggregation, choose Date Histogram.
  12. For Field, choose requestdatetime.
  13. For Interval, choose Daily.
  14. Apply the changes by choosing the play icon at the top of the page.

You should see the visual on the right side, similar to the following screenshot.

You can combine graphs of different use cases into a dashboard. I have built some example graphs for general use cases like the number of operations per bucket, user action breakdown for buckets, HTTPS status rate, top users, and tabular formatted error details. See the following screenshots.

Cleaning up

Delete all the resources deployed through the CloudFormation template to avoid any unintended costs.

  1. Disable the access log on source bucket.
  2. On to the CloudFormation console, identify the stacks appropriately, and delete

Summary

This post detailed a solution to visualize and monitor Amazon S3 access logs using Amazon ES to ensure compliance, perform security audits, and discover risks and patterns at scale with minimal latency. To learn about best practices of Amazon ES, see Amazon Elasticsearch Service Best Practices. To learn how to analyze and create a dashboard of data stored in Amazon ES, see the AWS Security Blog.


About the Authors

Mahesh Goyal is a Data Architect in Big Data at AWS. He works with customers in their journey to the cloud with a focus on big data and data warehouses. In his spare time, Mahesh likes to listen to music and explore new food places with his family.