Tag Archives: Processing

[$] An introduction to asynchronous Python

Post Syndicated from jake original https://lwn.net/Articles/726600/rss

In his PyCon 2017 talk, Miguel
Grinberg wanted to introduce asynchronous programming with Python to
complete beginners. There is a lot of talk about asynchronous Python,
especially with the advent of the
asyncio module
, but there are multiple ways to create
asynchronous Python programs, many of which have been available for quite
some time. In the talk, Grinberg took something of a step back from the
intricacies of those solutions to look at what asynchronous processing
means at a
higher level.

Synchronizing Amazon S3 Buckets Using AWS Step Functions

Post Syndicated from Andy Katz original https://aws.amazon.com/blogs/compute/synchronizing-amazon-s3-buckets-using-aws-step-functions/

Constantin Gonzalez is a Principal Solutions Architect at AWS

In my free time, I run a small blog that uses Amazon S3 to host static content and Amazon CloudFront to distribute it world-wide. I use a home-grown, static website generator to create and upload my blog content onto S3.

My blog uses two S3 buckets: one for staging and testing, and one for production. As a website owner, I want to update the production bucket with all changes from the staging bucket in a reliable and efficient way, without having to create and populate a new bucket from scratch. Therefore, to synchronize files between these two buckets, I use AWS Lambda and AWS Step Functions.

In this post, I show how you can use Step Functions to build a scalable synchronization engine for S3 buckets and learn some common patterns for designing Step Functions state machines while you do so.

Step Functions overview

Step Functions makes it easy to coordinate the components of distributed applications and microservices using visual workflows. Building applications from individual components that each perform a discrete function lets you scale and change applications quickly.

While this particular example focuses on synchronizing objects between two S3 buckets, it can be generalized to any other use case that involves coordinated processing of any number of objects in S3 buckets, or other, similar data processing patterns.

Bucket replication options

Before I dive into the details on how this particular example works, take a look at some alternatives for copying or replicating data between two Amazon S3 buckets:

  • The AWS CLI provides customers with a powerful aws s3 sync command that can synchronize the contents of one bucket with another.
  • S3DistCP is a powerful tool for users of Amazon EMR that can efficiently load, save, or copy large amounts of data between S3 buckets and HDFS.
  • The S3 cross-region replication functionality enables automatic, asynchronous copying of objects across buckets in different AWS regions.

In this use case, you are looking for a slightly different bucket synchronization solution that:

  • Works within the same region
  • Is more scalable than a CLI approach running on a single machine
  • Doesn’t require managing any servers
  • Uses a more finely grained cost model than the hourly based Amazon EMR approach

You need a scalable, serverless, and customizable bucket synchronization utility.

Solution architecture

Your solution needs to do three things:

  1. Copy all objects from a source bucket into a destination bucket, but leave out objects that are already present, for efficiency.
  2. Delete all "orphaned" objects from the destination bucket that aren’t present on the source bucket, because you don’t want obsolete objects lying around.
  3. Keep track of all objects for #1 and #2, regardless of how many objects there are.

In the beginning, you read in the source and destination buckets as parameters and perform basic parameter validation. Then, you operate two separate, independent loops, one for copying missing objects and one for deleting obsolete objects. Each loop is a sequence of Step Functions states that read in chunks of S3 object lists and use the continuation token to decide in a choice state whether to continue the loop or not.

This solution is based on the following architecture that uses Step Functions, Lambda, and two S3 buckets:

As you can see, this setup involves no servers, just two main building blocks:

  • Step Functions manages the overall flow of synchronizing the objects from the source bucket with the destination bucket.
  • A set of Lambda functions carry out the individual steps necessary to perform the work, such as validating input, getting lists of objects from source and destination buckets, copying or deleting objects in batches, and so on.

To understand the synchronization flow in more detail, look at the Step Functions state machine diagram for this example.

Walkthrough

Here’s a detailed discussion of how this works.

To follow along, use the code in the sync-buckets-state-machine GitHub repo. The code comes with a ready-to-run deployment script in Python that takes care of all the IAM roles, policies, Lambda functions, and of course the Step Functions state machine deployment using AWS CloudFormation, as well as instructions on how to use it.

Fine print: Use at your own risk

Before I start, here are some disclaimers:

  • Educational purposes only.

    The following example and code are intended for educational purposes only. Make sure that you customize, test, and review it on your own before using any of this in production.

  • S3 object deletion.

    In particular, using the code included below may delete objects on S3 in order to perform synchronization. Make sure that you have backups of your data. In particular, consider using the Amazon S3 Versioning feature to protect yourself against unintended data modification or deletion.

Step Functions execution starts with an initial set of parameters that contain the source and destination bucket names in JSON:

{
    "source":       "my-source-bucket-name",
    "destination":  "my-destination-bucket-name"
}

Armed with this data, Step Functions execution proceeds as follows.

Step 1: Detect the bucket region

First, you need to know the regions where your buckets reside. In this case, take advantage of the Step Functions Parallel state. This allows you to use a Lambda function get_bucket_location.py inside two different, parallel branches of task states:

  • FindRegionForSourceBucket
  • FindRegionForDestinationBucket

Each task state receives one bucket name as an input parameter, then detects the region corresponding to "their" bucket. The output of these functions is collected in a result array containing one element per parallel function.

Step 2: Combine the parallel states

The output of a parallel state is a list with all the individual branches’ outputs. To combine them into a single structure, use a Lambda function called combine_dicts.py in its own CombineRegionOutputs task state. The function combines the two outputs from step 1 into a single JSON dict that provides you with the necessary region information for each bucket.

Step 3: Validate the input

In this walkthrough, you only support buckets that reside in the same region, so you need to decide if the input is valid or if the user has given you two buckets in different regions. To find out, use a Lambda function called validate_input.py in the ValidateInput task state that tests if the two regions from the previous step are equal. The output is a Boolean.

Step 4: Branch the workflow

Use another type of Step Functions state, a Choice state, which branches into a Failure state if the comparison in step 3 yields false, or proceeds with the remaining steps if the comparison was successful.

Step 5: Execute in parallel

The actual work is happening in another Parallel state. Both branches of this state are very similar to each other and they re-use some of the Lambda function code.

Each parallel branch implements a looping pattern across the following steps:

  1. Use a Pass state to inject either the string value "source" (InjectSourceBucket) or "destination" (InjectDestinationBucket) into the listBucket attribute of the state document.

    The next step uses either the source or the destination bucket, depending on the branch, while executing the same, generic Lambda function. You don’t need two Lambda functions that differ only slightly. This step illustrates how to use Pass states as a way of injecting constant parameters into your state machine and as a way of controlling step behavior while re-using common step execution code.

  2. The next step UpdateSourceKeyList/UpdateDestinationKeyList lists objects in the given bucket.

    Remember that the previous step injected either "source" or "destination" into the state document’s listBucket attribute. This step uses the same list_bucket.py Lambda function to list objects in an S3 bucket. The listBucket attribute of its input decides which bucket to list. In the left branch of the main parallel state, use the list of source objects to work through copying missing objects. The right branch uses the list of destination objects, to check if they have a corresponding object in the source bucket and eliminate any orphaned objects. Orphans don’t have a source object of the same S3 key.

  3. This step performs the actual work. In the left branch, the CopySourceKeys step uses the copy_keys.py Lambda function to go through the list of source objects provided by the previous step, then copies any missing object into the destination bucket. Its sister step in the other branch, DeleteOrphanedKeys, uses its destination bucket key list to test whether each object from the destination bucket has a corresponding source object, then deletes any orphaned objects.

  4. The S3 ListObjects API action is designed to be scalable across many objects in a bucket. Therefore, it returns object lists in chunks of configurable size, along with a continuation token. If the API result has a continuation token, it means that there are more objects in this list. You can work from token to token to continue getting object list chunks, until you get no more continuation tokens.

By breaking down large amounts of work into chunks, you can make sure each chunk is completed within the timeframe allocated for the Lambda function, and within the maximum input/output data size for a Step Functions state.

This approach comes with a slight tradeoff: the more objects you process at one time in a given chunk, the faster you are done. There’s less overhead for managing individual chunks. On the other hand, if you process too many objects within the same chunk, you risk going over time and space limits of the processing Lambda function or the Step Functions state so the work cannot be completed.

In this particular case, use a Lambda function that maximizes the number of objects listed from the S3 bucket that can be stored in the input/output state data. This is currently up to 32,768 bytes, assuming (based on some experimentation) that the execution of the COPY/DELETE requests in the processing states can always complete in time.

A more sophisticated approach would use the Step Functions retry/catch state attributes to account for any time limits encountered and adjust the list size accordingly through some list site adjusting.

Step 6: Test for completion

Because the presence of a continuation token in the S3 ListObjects output signals that you are not done processing all objects yet, use a Choice state to test for its presence. If a continuation token exists, it branches into the UpdateSourceKeyList step, which uses the token to get to the next chunk of objects. If there is no token, you’re done. The state machine then branches into the FinishCopyBranch/FinishDeleteBranch state.

By using Choice states like this, you can create loops exactly like the old times, when you didn’t have for statements and used branches in assembly code instead!

Step 7: Success!

Finally, you’re done, and can step into your final Success state.

Lessons learned

When implementing this use case with Step Functions and Lambda, I learned the following things:

  • Sometimes, it is necessary to manipulate the JSON state of a Step Functions state machine with just a few lines of code that hardly seem to warrant their own Lambda function. This is ok, and the cost is actually pretty low given Lambda’s 100 millisecond billing granularity. The upside is that functions like these can be helpful to make the data more palatable for the following steps or for facilitating Choice states. An example here would be the combine_dicts.py function.
  • Pass states can be useful beyond debugging and tracing, they can be used to inject arbitrary values into your state JSON and guide generic Lambda functions into doing specific things.
  • Choice states are your friend because you can build while-loops with them. This allows you to reliably grind through large amounts of data with the patience of an engine that currently supports execution times of up to 1 year.

    Currently, there is an execution history limit of 25,000 events. Each Lambda task state execution takes up 5 events, while each choice state takes 2 events for a total of 7 events per loop. This means you can loop about 3500 times with this state machine. For even more scalability, you can split up work across multiple Step Functions executions through object key sharding or similar approaches.

  • It’s not necessary to spend a lot of time coding exception handling within your Lambda functions. You can delegate all exception handling to Step Functions and instead simplify your functions as much as possible.

  • Step Functions are great replacements for shell scripts. This could have been a shell script, but then I would have had to worry about where to execute it reliably, how to scale it if it went beyond a few thousand objects, etc. Think of Step Functions and Lambda as tools for scripting at a cloud level, beyond the boundaries of servers or containers. "Serverless" here also means "boundary-less".

Summary

This approach gives you scalability by breaking down any number of S3 objects into chunks, then using Step Functions to control logic to work through these objects in a scalable, serverless, and fully managed way.

To take a look at the code or tweak it for your own needs, use the code in the sync-buckets-state-machine GitHub repo.

To see more examples, please visit the Step Functions Getting Started page.

Enjoy!

Court Suspends Ban on Roku Sales in Mexico

Post Syndicated from Ernesto original https://torrentfreak.com/court-suspends-ban-on-roku-sales-in-mexico-170623/

Last week, news broke that the Superior Court of Justice of the City of Mexico had issued a ban on Roku sales.

The order prohibited stores such as Amazon, Liverpool, El Palacio de Hierro, and Sears from importing and selling the devices. In addition, several banks were told stop processing payments from accounts that are linked to pirated services on Roku.

While Roku itself is not offering any pirated content, there is a market for third-party pirate channels outside the Roku Channel Store, which turn the boxes into pirate tools. Cablevision filed a complaint about this unauthorized use which eventually resulted in the ban.

The news generated headlines all over the world and was opposed immediately by several of the parties involved. Yesterday, a federal judge decided to suspend the import and sales ban, at least temporarily.

As a result, local vendors can resume their sales of the popular media player.

“Roku is pleased with today’s court decision, which paves the way for sales of Roku devices to resume in Mexico,” Roku’s General Counsel Steve Kay informed TorrentFreak after he heard the news.

Roku

TorrentFreak has not been able to get a copy of the suspension order, but it’s likely that the court wants to review the case in more detail before a final decision is made.

While streaming player piracy is seen as one of the greatest threats the entertainment industry faces today, the Roku ban went quite far. In a way, it would be similar to banning the Chrome browser because certain add-ons and sites allow users to stream pirated movies.

Roku, meanwhile, says it will continue to work with rightholders and other stakeholders to prevent piracy on its platform, to the best of their ability.

“Piracy is a problem the industry at large is facing,” Key tells TorrentFreak.

“We prohibit copyright infringement of any kind on the Roku platform. We actively work to prevent third-parties from using our platform to distribute copyright infringing content. Moreover, we have been actively working with other industry stakeholders on a wide range of anti-piracy initiatives.”

Source: TF, for the latest info on copyright, file-sharing, torrent sites and ANONYMOUS VPN services.

Building Loosely Coupled, Scalable, C# Applications with Amazon SQS and Amazon SNS

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/building-loosely-coupled-scalable-c-applications-with-amazon-sqs-and-amazon-sns/

 
Stephen Liedig, Solutions Architect

 

One of the many challenges professional software architects and developers face is how to make cloud-native applications scalable, fault-tolerant, and highly available.

Fundamental to your project success is understanding the importance of making systems highly cohesive and loosely coupled. That means considering the multi-dimensional facets of system coupling to support the distributed nature of the applications that you are building for the cloud.

By that, I mean addressing not only the application-level coupling (managing incoming and outgoing dependencies), but also considering the impacts of of platform, spatial, and temporal coupling of your systems. Platform coupling relates to the interoperability, or lack thereof, of heterogeneous systems components. Spatial coupling deals with managing components at a network topology level or protocol level. Temporal, or runtime coupling, refers to the ability of a component within your system to do any kind of meaningful work while it is performing a synchronous, blocking operation.

The AWS messaging services, Amazon SQS and Amazon SNS, help you deal with these forms of coupling by providing mechanisms for:

  • Reliable, durable, and fault-tolerant delivery of messages between application components
  • Logical decomposition of systems and increased autonomy of components
  • Creating unidirectional, non-blocking operations, temporarily decoupling system components at runtime
  • Decreasing the dependencies that components have on each other through standard communication and network channels

Following on the recent topic, Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox, in this post, I look at some of the ways you can introduce SQS and SNS into your architectures to decouple your components, and show how you can implement them using C#.

Walkthrough

To illustrate some of these concepts, consider a web application that processes customer orders. As good architects and developers, you have followed best practices and made your application scalable and highly available. Your solution included implementing load balancing, dynamic scaling across multiple Availability Zones, and persisting orders in a Multi-AZ Amazon RDS database instance, as in the following diagram.


In this example, the application is responsible for handling and persisting the order data, as well as dealing with increases in traffic for popular items.

One potential point of vulnerability in the order processing workflow is in saving the order in the database. The business expects that every order has been persisted into the database. However, any potential deadlock, race condition, or network issue could cause the persistence of the order to fail. Then, the order is lost with no recourse to restore the order.

With good logging capability, you may be able to identify when an error occurred and which customer’s order failed. This wouldn’t allow you to “restore” the transaction, and by that stage, your customer is no longer your customer.

As illustrated in the following diagram, introducing an SQS queue helps improve your ordering application. Using the queue isolates the processing logic into its own component and runs it in a separate process from the web application. This, in turn, allows the system to be more resilient to spikes in traffic, while allowing work to be performed only as fast as necessary in order to manage costs.


In addition, you now have a mechanism for persisting orders as messages (with the queue acting as a temporary database), and have moved the scope of your transaction with your database further down the stack. In the event of an application exception or transaction failure, this ensures that the order processing can be retired or redirected to the Amazon SQS Dead Letter Queue (DLQ), for re-processing at a later stage. (See the recent post, Using Amazon SQS Dead-Letter Queues to Control Message Failure, for more information on dead-letter queues.)

Scaling the order processing nodes

This change allows you now to scale the web application frontend independently from the processing nodes. The frontend application can continue to scale based on metrics such as CPU usage, or the number of requests hitting the load balancer. Processing nodes can scale based on the number of orders in the queue. Here is an example of scale-in and scale-out alarms that you would associate with the scaling policy.

Scale-out Alarm

aws cloudwatch put-metric-alarm --alarm-name AddCapacityToCustomerOrderQueue --metric-name ApproximateNumberOfMessagesVisible --namespace "AWS/SQS" 
--statistic Average --period 300 --threshold 3 --comparison-operator GreaterThanOrEqualToThreshold --dimensions Name=QueueName,Value=customer-orders
--evaluation-periods 2 --alarm-actions <arn of the scale-out autoscaling policy>

Scale-in Alarm

aws cloudwatch put-metric-alarm --alarm-name RemoveCapacityFromCustomerOrderQueue --metric-name ApproximateNumberOfMessagesVisible --namespace "AWS/SQS" 
 --statistic Average --period 300 --threshold 1 --comparison-operator LessThanOrEqualToThreshold --dimensions Name=QueueName,Value=customer-orders
 --evaluation-periods 2 --alarm-actions <arn of the scale-in autoscaling policy>

In the above example, use the ApproximateNumberOfMessagesVisible metric to discover the queue length and drive the scaling policy of the Auto Scaling group. Another useful metric is ApproximateAgeOfOldestMessage, when applications have time-sensitive messages and developers need to ensure that messages are processed within a specific time period.

Scaling the order processing implementation

On top of scaling at an infrastructure level using Auto Scaling, make sure to take advantage of the processing power of your Amazon EC2 instances by using as many of the available threads as possible. There are several ways to implement this. In this post, we build a Windows service that uses the BackgroundWorker class to process the messages from the queue.

Here’s a closer look at the implementation. In the first section of the consuming application, use a loop to continually poll the queue for new messages, and construct a ReceiveMessageRequest variable.

public static void PollQueue()
{
    while (_running)
    {
        Task<ReceiveMessageResponse> receiveMessageResponse;

        // Pull messages off the queue
        using (var sqs = new AmazonSQSClient())
        {
            const int maxMessages = 10;  // 1-10

            //Receiving a message
            var receiveMessageRequest = new ReceiveMessageRequest
            {
                // Get URL from Configuration
                QueueUrl = _queueUrl, 
                // The maximum number of messages to return. 
                // Fewer messages might be returned. 
                MaxNumberOfMessages = maxMessages, 
                // A list of attributes that need to be returned with message.
                AttributeNames = new List<string> { "All" },
                // Enable long polling. 
                // Time to wait for message to arrive on queue.
                WaitTimeSeconds = 5 
            };

            receiveMessageResponse = sqs.ReceiveMessageAsync(receiveMessageRequest);
        }

The WaitTimeSeconds property of the ReceiveMessageRequest specifies the duration (in seconds) that the call waits for a message to arrive in the queue before returning a response to the calling application. There are a few benefits to using long polling:

  • It reduces the number of empty responses by allowing SQS to wait until a message is available in the queue before sending a response.
  • It eliminates false empty responses by querying all (rather than a limited number) of the servers.
  • It returns messages as soon any message becomes available.

For more information, see Amazon SQS Long Polling.

After you have returned messages from the queue, you can start to process them by looping through each message in the response and invoking a new BackgroundWorker thread.

// Process messages
if (receiveMessageResponse.Result.Messages != null)
{
    foreach (var message in receiveMessageResponse.Result.Messages)
    {
        Console.WriteLine("Received SQS message, starting worker thread");

        // Create background worker to process message
        BackgroundWorker worker = new BackgroundWorker();
        worker.DoWork += (obj, e) => ProcessMessage(message);
        worker.RunWorkerAsync();
    }
}
else
{
    Console.WriteLine("No messages on queue");
}

The event handler, ProcessMessage, is where you implement business logic for processing orders. It is important to have a good understanding of how long a typical transaction takes so you can set a message VisibilityTimeout that is long enough to complete your operation. If order processing takes longer than the specified timeout period, the message becomes visible on the queue. Other nodes may pick it and process the same order twice, leading to unintended consequences.

Handling Duplicate Messages

In order to manage duplicate messages, seek to make your processing application idempotent. In mathematics, idempotent describes a function that produces the same result if it is applied to itself:

f(x) = f(f(x))

No matter how many times you process the same message, the end result is the same (definition from Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions, Hohpe and Wolf, 2004).

There are several strategies you could apply to achieve this:

  • Create messages that have inherent idempotent characteristics. That is, they are non-transactional in nature and are unique at a specified point in time. Rather than saying “place new order for Customer A,” which adds a duplicate order to the customer, use “place order <orderid> on <timestamp> for Customer A,” which creates a single order no matter how often it is persisted.
  • Deliver your messages via an Amazon SQS FIFO queue, which provides the benefits of message sequencing, but also mechanisms for content-based deduplication. You can deduplicate using the MessageDeduplicationId property on the SendMessage request or by enabling content-based deduplication on the queue, which generates a hash for MessageDeduplicationId, based on the content of the message, not the attributes.
var sendMessageRequest = new SendMessageRequest
{
    QueueUrl = _queueUrl,
    MessageBody = JsonConvert.SerializeObject(order),
    MessageGroupId = Guid.NewGuid().ToString("N"),
    MessageDeduplicationId = Guid.NewGuid().ToString("N")
};
  • If using SQS FIFO queues is not an option, keep a message log of all messages attributes processed for a specified period of time, as an alternative to message deduplication on the receiving end. Verifying the existence of the message in the log before processing the message adds additional computational overhead to your processing. This can be minimized through low latency persistence solutions such as Amazon DynamoDB. Bear in mind that this solution is dependent on the successful, distributed transaction of the message and the message log.

Handling exceptions

Because of the distributed nature of SQS queues, it does not automatically delete the message. Therefore, you must explicitly delete the message from the queue after processing it, using the message ReceiptHandle property (see the following code example).

However, if at any stage you have an exception, avoid handling it as you normally would. The intention is to make sure that the message ends back on the queue, so that you can gracefully deal with intermittent failures. Instead, log the exception to capture diagnostic information, and swallow it.

By not explicitly deleting the message from the queue, you can take advantage of the VisibilityTimeout behavior described earlier. Gracefully handle the message processing failure and make the unprocessed message available to other nodes to process.

In the event that subsequent retries fail, SQS automatically moves the message to the configured DLQ after the configured number of receives has been reached. You can further investigate why the order process failed. Most importantly, the order has not been lost, and your customer is still your customer.

private static void ProcessMessage(Message message)
{
    using (var sqs = new AmazonSQSClient())
    {
        try
        {
            Console.WriteLine("Processing message id: {0}", message.MessageId);

            // Implement messaging processing here
            // Ensure no downstream resource contention (parallel processing)
            // <your order processing logic in here…>
            Console.WriteLine("{0} Thread {1}: {2}", DateTime.Now.ToString("s"), Thread.CurrentThread.ManagedThreadId, message.MessageId);
            
            // Delete the message off the queue. 
            // Receipt handle is the identifier you must provide 
            // when deleting the message.
            var deleteRequest = new DeleteMessageRequest(_queueName, message.ReceiptHandle);
            sqs.DeleteMessageAsync(deleteRequest);
            Console.WriteLine("Processed message id: {0}", message.MessageId);

        }
        catch (Exception ex)
        {
            // Do nothing.
            // Swallow exception, message will return to the queue when 
            // visibility timeout has been exceeded.
            Console.WriteLine("Could not process message due to error. Exception: {0}", ex.Message);
        }
    }
}

Using SQS to adapt to changing business requirements

One of the benefits of introducing a message queue is that you can accommodate new business requirements without dramatically affecting your application.

If, for example, the business decided that all orders placed over $5000 are to be handled as a priority, you could introduce a new “priority order” queue. The way the orders are processed does not change. The only significant change to the processing application is to ensure that messages from the “priority order” queue are processed before the “standard order” queue.

The following diagram shows how this logic could be isolated in an “order dispatcher,” whose only purpose is to route order messages to the appropriate queue based on whether the order exceeds $5000. Nothing on the web application or the processing nodes changes other than the target queue to which the order is sent. The rates at which orders are processed can be achieved by modifying the poll rates and scalability settings that I have already discussed.

Extending the design pattern with Amazon SNS

Amazon SNS supports reliable publish-subscribe (pub-sub) scenarios and push notifications to known endpoints across a wide variety of protocols. It eliminates the need to periodically check or poll for new information and updates. SNS supports:

  • Reliable storage of messages for immediate or delayed processing
  • Publish / subscribe – direct, broadcast, targeted “push” messaging
  • Multiple subscriber protocols
  • Amazon SQS, HTTP, HTTPS, email, SMS, mobile push, AWS Lambda

With these capabilities, you can provide parallel asynchronous processing of orders in the system and extend it to support any number of different business use cases without affecting the production environment. This is commonly referred to as a “fanout” scenario.

Rather than your web application pushing orders to a queue for processing, send a notification via SNS. The SNS messages are sent to a topic and then replicated and pushed to multiple SQS queues and Lambda functions for processing.

As the diagram above shows, you have the development team consuming “live” data as they work on the next version of the processing application, or potentially using the messages to troubleshoot issues in production.

Marketing is consuming all order information, via a Lambda function that has subscribed to the SNS topic, inserting the records into an Amazon Redshift warehouse for analysis.

All of this, of course, is happening without affecting your order processing application.

Summary

While I haven’t dived deep into the specifics of each service, I have discussed how these services can be applied at an architectural level to build loosely coupled systems that facilitate multiple business use cases. I’ve also shown you how to use infrastructure and application-level scaling techniques, so you can get the most out of your EC2 instances.

One of the many benefits of using these managed services is how quickly and easily you can implement powerful messaging capabilities in your systems, and lower the capital and operational costs of managing your own messaging middleware.

Using Amazon SQS and Amazon SNS together can provide you with a powerful mechanism for decoupling application components. This should be part of design considerations as you architect for the cloud.

For more information, see the Amazon SQS Developer Guide and Amazon SNS Developer Guide. You’ll find tutorials on all the concepts covered in this post, and more. To can get started using the AWS console or SDK of your choice visit:

Happy messaging!

Roku Sales Banned in Mexico Over Piracy Concerns

Post Syndicated from Ernesto original https://torrentfreak.com/roku-sales-banned-in-mexico-over-piracy-concerns-170619/

Online streaming piracy is on the rise and many people use dedicated media players to watch it through their regular TV.

While a lot of attention has been on Kodi, there are other players on the market that allow people to do the same. Roku, for example, has been doing very well too.

Like Kodi, Roku media players don’t offer any pirated content out of the box. In fact, they can be hooked up to a wide variety of legal streaming options including HBO Go, Hulu, and Netflix. Still, there is also a market for third-party pirate channels, outside the Roku Channel Store, which turn the boxes into pirate tools.

This pirate angle has now resulted in a ban on Roku sales in Mexico, according to a report in Milenio.

The ban was issued by the Superior Court of Justice of the City of Mexico, following a complaint from Cablevision. The order in question prohibits stores such as Amazon, Liverpool, El Palacio de Hierro, and Sears from importing and selling the devices.

In addition, the court also instructs banks including Banorte and BBVA Bancomer to stop processing payments from a long list of accounts linked to pirated services on Roku.

The main reason for the order is the availability of pirated content through Roku, but banning the device itself is utterly comprehensive. It would be similar to banning all Android-based devices because certain apps allow users to stream copyrighted content without permission.

Roku

Roku has yet to release an official statement on the court order. TorrentFreak reached out to the company but hadn’t heard back at the time of publication.

It’s clear, however, that streaming players are among the top concerns for copyright holders. Motion Picture Association boss Stan McCoy recently characterized the use of streaming players to access infringing content as “Piracy 3.0.

“If you think of old-fashioned peer-to-peer piracy as 1.0, and then online illegal streaming websites as 2.0, in the audio-visual sector, in particular, we now face challenge number 3.0, which is what I’ll call the challenge of illegal streaming devices,” McCoy said earlier this month.

Unlike the court order in Mexico, however, McCoy stressed that the devices themselves, and software such as Kodi, are ‘probably’ not illegal. However, copyright-infringing pirate add-ons have the capability to turn them into an unprecedented piracy threat.

Source: TF, for the latest info on copyright, file-sharing, torrent sites and ANONYMOUS VPN services.

G’MIC 2.0

Post Syndicated from ris original https://lwn.net/Articles/724886/rss

G’MIC is a generic, extensible framework for image processing, often used
as a plug-in for GIMP. Version 2.0 has been released. “One
of the major new features of this version 2.0 is the re-implementation of
the plug-in code, from scratch. The repository G’MIC-Qt developed by Sébastien (an experienced member
of the team) is a Qt-based version of the plug-in interface, being as
independent as possible of the widget API provided by GIMP.
” The
announcement has much more details about G’MIC and how it can be used. LWN
looked at G’MIC in August 2014.

Using Amazon SQS Dead-Letter Queues to Control Message Failure

Post Syndicated from Tara Van Unen original https://aws.amazon.com/blogs/compute/using-amazon-sqs-dead-letter-queues-to-control-message-failure/


Michael G. Khmelnitsky, Senior Programmer Writer

 

Sometimes, messages can’t be processed because of a variety of possible issues, such as erroneous conditions within the producer or consumer application. For example, if a user places an order within a certain number of minutes of creating an account, the producer might pass a message with an empty string instead of a customer identifier. Occasionally, producers and consumers might fail to interpret aspects of the protocol that they use to communicate, causing message corruption or loss. Also, the consumer’s hardware errors might corrupt message payload. For these reasons, messages that can’t be processed in a timely manner are delivered to a dead-letter queue.

The recent post Building Scalable Applications and Microservices: Adding Messaging to Your Toolbox gives an overview of messaging in the microservice architecture of modern applications. This post explains how and when you should use dead-letter queues to gain better control over message handling in your applications. It also offers some resources for configuring a dead-letter queue in Amazon Simple Queue Service (SQS).

What are the benefits of dead-letter queues?

The main task of a dead-letter queue is handling message failure. A dead-letter queue lets you set aside and isolate messages that can’t be processed correctly to determine why their processing didn’t succeed. Setting up a dead-letter queue allows you to do the following:

  • Configure an alarm for any messages delivered to a dead-letter queue.
  • Examine logs for exceptions that might have caused messages to be delivered to a dead-letter queue.
  • Analyze the contents of messages delivered to a dead-letter queue to diagnose software or the producer’s or consumer’s hardware issues.
  • Determine whether you have given your consumer sufficient time to process messages.

How do high-throughput, unordered queues handle message failure?

High-throughput, unordered queues (sometimes called standard or storage queues) keep processing messages until the expiration of the retention period. This helps ensure continuous processing of messages, which minimizes the chances of your queue being blocked by messages that can’t be processed. It also ensures fast recovery for your queue.

In a system that processes thousands of messages, having a large number of messages that the consumer repeatedly fails to acknowledge and delete might increase costs and place extra load on the hardware. Instead of trying to process failing messages until they expire, it is better to move them to a dead-letter queue after a few processing attempts.

Note: This queue type often allows a high number of in-flight messages. If the majority of your messages can’t be consumed and aren’t sent to a dead-letter queue, your rate of processing valid messages can slow down. Thus, to maintain the efficiency of your queue, you must ensure that your application handles message processing correctly.

How do FIFO queues handle message failure?

FIFO (first-in-first-out) queues (sometimes called service bus queues) help ensure exactly-once processing by consuming messages in sequence from a message group. Thus, although the consumer can continue to retrieve ordered messages from another message group, the first message group remains unavailable until the message blocking the queue is processed successfully.

Note: This queue type often allows a lower number of in-flight messages. Thus, to help ensure that your FIFO queue doesn’t get blocked by a message, you must ensure that your application handles message processing correctly.

When should I use a dead-letter queue?

  • Do use dead-letter queues with high-throughput, unordered queues. You should always take advantage of dead-letter queues when your applications don’t depend on ordering. Dead-letter queues can help you troubleshoot incorrect message transmission operations. Note: Even when you use dead-letter queues, you should continue to monitor your queues and retry sending messages that fail for transient reasons.
  • Do use dead-letter queues to decrease the number of messages and to reduce the possibility of exposing your system to poison-pill messages (messages that can be received but can’t be processed).
  • Don’t use a dead-letter queue with high-throughput, unordered queues when you want to be able to keep retrying the transmission of a message indefinitely. For example, don’t use a dead-letter queue if your program must wait for a dependent process to become active or available.
  • Don’t use a dead-letter queue with a FIFO queue if you don’t want to break the exact order of messages or operations. For example, don’t use a dead-letter queue with instructions in an Edit Decision List (EDL) for a video editing suite, where changing the order of edits changes the context of subsequent edits.

How do I get started with dead-letter queues in Amazon SQS?

Amazon SQS is a fully managed service that offers reliable, highly scalable hosted queues for exchanging messages between applications or microservices. Amazon SQS moves data between distributed application components and helps you decouple these components. It supports both standard queues and FIFO queues. To configure a queue as a dead-letter queue, you can use the AWS Management Console or the Amazon SQS SetQueueAttributes API action.

To get started with dead-letter queues in Amazon SQS, see the following topics in the Amazon SQS Developer Guide:

To start working with dead-letter queues programmatically, see the following resources:

Preserving the Music of Austin City Limits

Post Syndicated from Andy Klein original https://www.backblaze.com/blog/preserving-the-music-of-austin-city-limits/

Austin City Limits

KLRU-TV, Austin PBS created Austin City Limits 42 years ago and has produced it ever since. Austin City Limits is the longest-running music series in television history. Over the years, KLRU accumulated over 550 episodes and thousands of hours of unaired footage stored on videotape. When KLRU decided to preserve their collection they turned to Backblaze for help with uploading and storing this unparalleled musical anthology in the Backblaze B2 cloud.

Upload: Backblaze B2 Fireball

KLRU started their preservation efforts by digitizing their collection of videotapes. After some internal processing, they were ready to upload the files to Backblaze, but there was a problem – one facing many organizations with a stash of historical digital data – their network connection was “slow”. It was fine for daily work, but uploading terabytes of data was not going to work.

“We would not have been able to get this project off the ground without the B2 Fireball.” – James Cole, KLRU

Backblaze B2 Fireball to the rescue. The B2 Fireball is a secure, shippable, data ingest system capable of transporting up to 40 terabytes of data from your location to Backblaze where the data is ingested into your B2 account. Designed for those organizations that have large amounts of data locally that they want to store in the cloud, the Backblaze B2 Fireball was just what KLRU needed to get the project started.

Preserve: Live Archive with B2

The KLRU staff is working hard to digitize and restore their entire musical archive and they are committed to preserving their data by having both a local copy and a cloud copy of their files. By choosing Backblaze B2 Cloud Storage versus a near-line or off-line storage solution KLRU now has an affordable live archive of their data they can access without delay anytime they need.

You can download and read the entire Austin City Limits case study for more details on how KLRU used B2 as part of their strategy to preserve their entire catalog of Austin City Limits content for future generations.

Dave Grohl Austin City Limits performance

The post Preserving the Music of Austin City Limits appeared first on Backblaze Blog | Cloud Storage & Cloud Backup.

Some non-lessons from WannaCry

Post Syndicated from Robert Graham original http://blog.erratasec.com/2017/06/some-non-lessons-from-wannacry.html

This piece by Bruce Schneier needs debunking. I thought I’d list the things wrong with it.

The NSA 0day debate

Schneier’s description of the problem is deceptive:

When the US government discovers a vulnerability in a piece of software, however, it decides between two competing equities. It can keep it secret and use it offensively, to gather foreign intelligence, help execute search warrants, or deliver malware. Or it can alert the software vendor and see that the vulnerability is patched, protecting the country — and, for that matter, the world — from similar attacks by foreign governments and cybercriminals. It’s an either-or choice.

The government doesn’t “discover” vulnerabilities accidentally. Instead, when the NSA has a need for something specific, it acquires the 0day, either through internal research or (more often) buying from independent researchers.

The value of something is what you are willing to pay for it. If the NSA comes across a vulnerability accidentally, then the value to them is nearly zero. Obviously such vulns should be disclosed and fixed. Conversely, if the NSA is willing to pay $1 million to acquire a specific vuln for imminent use against a target, the offensive value is much greater than the fix value.

What Schneier is doing is deliberately confusing the two, combing the policy for accidentally found vulns with deliberately acquired vulns.

The above paragraph should read instead:

When the government discovers a vulnerability accidentally, it then decides to alert the software vendor to get it patched. When the government decides it needs as vuln for a specific offensive use, it acquires one that meets its needs, uses it, and keeps it secret. After spending so much money acquiring an offensive vuln, it would obviously be stupid to change this decision and not use it offensively.

Hoarding vulns

Schneier also says the NSA is “hoarding” vulns. The word has a couple inaccurate connotations.
One connotation is that the NSA is putting them on a heap inside a vault, not using them. The opposite is true: the NSA only acquires vulns it for which it has an active need. It uses pretty much all the vulns it acquires. That can be seen in the ShadowBroker dump, all the vulns listed are extremely useful to attackers, especially ETERNALBLUE. Efficiency is important to the NSA. Your efficiency is your basis for promotion. There are other people who make their careers finding waste in the NSA. If you are hoarding vulns and not using them, you’ll quickly get ejected from the NSA.
Another connotation is that the NSA is somehow keeping the vulns away from vendors. That’s like saying I’m hoarding naked selfies of myself. Yes, technically I’m keeping them away from you, but it’s not like they ever belong to you in the first place. The same is true the NSA. Had it never acquired the ETERNALBLUE 0day, it never would’ve been researched, never found.

The VEP

Schneier describes the “Vulnerability Equities Process” or “VEP”, a process that is supposed to manage the vulnerabilities the government gets.

There’s no evidence the VEP process has ever been used, at least not with 0days acquired by the NSA. The VEP allows exceptions for important vulns, and all the NSA vulns are important, so all are excepted from the process. Since the NSA is in charge of the VEP, of course, this is at the sole discretion of the NSA. Thus, the entire point of the VEP process goes away.

Moreover, it can’t work in many cases. The vulns acquired by the NSA often come with clauses that mean they can’t be shared.

New classes of vulns

One reason sellers forbid 0days from being shared is because they use new classes of vulnerabilities, such that sharing one 0day will effectively ruin a whole set of vulnerabilities. Schneier poo-poos this because he doesn’t see new classes of vulns in the ShadowBroker set.
This is wrong for two reasons. The first is that the ShadowBroker 0days are incomplete. There’s no iOS exploits, for example, and we know that iOS is a big target of the NSA.
Secondly, I’m not sure we’ve sufficiently analyzed the ShadowBroker exploits yet to realize there may be a new class of vuln. It’s easy to miss the fact that a single bug we see in the dump may actually be a whole new class of vulnerability. In the past, it’s often been the case that a new class was named only after finding many examples.
In any case, Schneier misses the point denying new classes of vulns exist. He should instead use the point to prove the value of disclosure, that instead of playing wack-a-mole fixing bugs one at a time, vendors would be able to fix whole classes of bugs at once.

Rediscovery

Schneier cites two studies that looked at how often vulnerabilities get rediscovered. In other words, he’s trying to measure the likelihood that some other government will find the bug and use it against us.
These studies are weak, scarcely better than anecdotal evidence. Schneier’s own study seems almost unrelated to the problem, and the Rand’s study cannot be replicated, as it relies upon private data. Also, there is little differentiation between important bugs (like SMB/MSRPC exploits and full-chain iOS exploits) and lesser bugs.
Whether from the Rand study or from anecdotes, we have good reason to believe that the longer an 0day exists, the less likely it’ll be rediscovered. Schneier argues that vulns should only be used for 6 months before being disclosed to a vendor. Anecdotes suggest otherwise, that if it hasn’t been rediscovered in the first year, it likely won’t ever be.
The Rand study was overwhelmingly clear on the issue that 0days are dramatically more likely to become obsolete than be rediscovered. The latest update to iOS will break an 0day, rather than somebody else rediscovering it. Win10 adoption will break older SMB exploits faster than rediscovery.
In any case, this post is about ETERNALBLUE specifically. What we learned from this specific bug is that it was used for at least 5 year without anybody else rediscovering it (before it was leaked). Chances are good it never would’ve been rediscovered, just made obsolete by Win10.

Notification is notification

All disclosure has the potential of leading to worms like WannaCry. The Conficker worm of 2008, for example, was written after Microsoft patched the underlying vulnerability.
Thus, had the NSA disclosed the bug in the normal way, chances are good it still would’ve been used for worming ransomware.
Yes, WannaCry had a head-start because ShadowBrokers published a working exploit, but this doesn’t appear to have made a difference. The Blaster worm (the first worm to compromise millions of computers) took roughly the same amount of time to create, and almost no details were made public about the vulnerability, other than the fact it was patched. (I know from personal experience — we used diff to find what changed in the patch in order to reverse engineer the 0day).
In other words, the damage the NSA is responsible for isn’t really the damage that came after it was patched — that was likely to happen anyway, as it does with normal vuln disclosure. Instead, the only damage the NSA can truly be held responsible for is the damage ahead of time, such as the months (years?) the ShadowBrokers possessed the exploits before they were patched.

Disclosed doesn’t mean fixed

One thing we’ve learned from 30 years of disclosure is that vendors ignore bugs.
We’ve gotten to the state where a few big companies like Microsoft and Apple will actually fix bugs, but the vast majority of vendors won’t. Even Microsoft and Apple have been known to sit on tricky bugs for over a year before fixing them.
And the only reason Microsoft and Apple have gotten to this state is because we, the community, bullied them into it. When we disclose bugs to them, we give them a deadline when we make the bug public, whether or not its been fixed.
The same goes for the NSA. If they quietly disclose bugs to vendors, in general, they won’t be fixed unless the NSA also makes the bug public within a certain time frame. Either Schneier has to argue that the NSA should do such public full-disclosures, or argue that disclosures won’t always lead to fixes.

Replacement SMB/MSRPC

The ETERNALBLUE vuln is so valuable to the NSA that it’s almost certainly seeking a replacement.
Again, I’m trying to debunk the impression Schneier tries to form that somehow the NSA stumbled upon ETERNALBLUE by accident to begin with. The opposite is true: remote exploits for the SMB (port 445) or MSRPC (port 135) services are some of the most valuable vulns, and the NSA will work hard to acquire them.

That it was leaked

The only issue here is that the 0day leaked. If the NSA can’t keep it’s weaponized toys secret, then maybe it shouldn’t have them.
Instead of processing this new piece of information, which is important, Schneier takes this opportunity to just re-hash the old inaccurate and deceptive VEP debate.

Conclusion

Except for a tiny number of people working for the NSA, none of us really know what’s going on with 0days inside government. Schneier’s comments seem more off-base than most. Like all activists, he deliberately uses language to deceive rather than explain (like “discover” instead of “acquire”). Like all activists, he seems obsessed with the VEP, even though as far as anybody can tell, it’s not used for NSA acquired vulns. He deliberate ignores things he should be an expert in, such as how all patches/disclosures sometimes lead to worms/exploits, and how not all disclosure leads to fixes.

Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3

Post Syndicated from Illya Yalovyy original https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/

Have you ever needed to move a large amount of data between Amazon S3 and Hadoop Distributed File System (HDFS) but found that the data set was too large for a simple copy operation? EMR can help you with this. In addition to processing and analyzing petabytes of data, EMR can move large amounts of data.

In the Hadoop ecosystem, DistCp is often used to move data. DistCp provides a distributed copy capability built on top of a MapReduce framework. S3DistCp is an extension to DistCp that is optimized to work with S3 and that adds several useful features. In addition to moving data between HDFS and S3, S3DistCp is also a Swiss Army knife of file manipulations. In this post we’ll cover the following tips for using S3DistCp, starting with basic use cases and then moving to more advanced scenarios:

1. Copy or move files without transformation
2. Copy and change file compression on the fly
3. Copy files incrementally
4. Copy multiple folders in one job
5. Aggregate files based on a pattern
6. Upload files larger than 1 TB in size
7. Submit a S3DistCp step to an EMR cluster

1. Copy or move files without transformation

We’ve observed that customers often use S3DistCp to copy data from one storage location to another, whether S3 or HDFS. Syntax for this operation is simple and straightforward:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table

The source location may contain extra files that we don’t necessarily want to copy. Here, we can use filters based on regular expressions to do things such as copying files with the .log extension only.

Each subfolder has the following files:

$ hadoop fs -ls /data/incoming/hourly_table/2017-02-01/03
Found 8 items
-rw-r--r--   1 hadoop hadoop     197850 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.25845.log
-rw-r--r--   1 hadoop hadoop     484006 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.32953.log
-rw-r--r--   1 hadoop hadoop     868522 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.62649.log
-rw-r--r--   1 hadoop hadoop     408072 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.64637.log
-rw-r--r--   1 hadoop hadoop    1031949 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.70767.log
-rw-r--r--   1 hadoop hadoop     368240 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.89910.log
-rw-r--r--   1 hadoop hadoop     437348 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.96053.log
-rw-r--r--   1 hadoop hadoop        800 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/processing.meta

To copy only the required files, let’s use the --srcPattern option:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table_filtered --srcPattern .*\.log

After the upload has finished successfully, let’s check the folder contents in the destination location to confirm only the files ending in .log were copied:

$ hadoop fs -ls s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03
-rw-rw-rw-   1     197850 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.25845.log
-rw-rw-rw-   1     484006 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.32953.log
-rw-rw-rw-   1     868522 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.62649.log
-rw-rw-rw-   1     408072 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.64637.log
-rw-rw-rw-   1    1031949 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.70767.log
-rw-rw-rw-   1     368240 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.89910.log
-rw-rw-rw-   1     437348 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.96053.log

Sometimes, data needs to be moved instead of copied. In this case, we can use the --deleteOnSuccess option. This option is similar to aws s3 mv, which you might have used previously with the AWS CLI. The files are first copied and then deleted from the source:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table_archive --deleteOnSuccess

After the preceding operation, the source location has only empty folders, and the target location contains all files.

$ hadoop fs -ls -R s3://my-tables/incoming/hourly_table/2017-02-01/
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/00
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/01
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/21
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/22


$ hadoop fs -ls s3://my-tables/incoming/hourly_table_archive/2017-02-01/01
-rw-rw-rw-   1     676756 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.27047.log
-rw-rw-rw-   1     780197 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.59789.log
-rw-rw-rw-   1    1041789 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.82293.log
-rw-rw-rw-   1        400 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/processing.meta

The important things to remember here are that S3DistCp deletes only files with the --deleteOnSuccess flag and that it doesn’t delete parent folders, even when they are empty.

2. Copy and change file compression on the fly

Raw files often land in S3 or HDFS in an uncompressed text format. This format is suboptimal both for the cost of storage and for running analytics on that data. S3DistCp can help you efficiently store data and compress files on the fly with the --outputCodec option:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table_filtered --dest s3://my-tables/incoming/hourly_table_gz --outputCodec=gz

The current version of S3DistCp supports the codecs gzip, gz, lzo, lzop, and snappy, and the keywords none and keep (the default). These keywords have the following meaning:

  • none” – Save files uncompressed. If the files are compressed, then S3DistCp decompresses them.
  • keep” – Don’t change the compression of the files but copy them as-is.

Let’s check the files in the target folder, which have now been compressed with the gz codec:

$ hadoop fs -ls s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/
Found 3 items
-rw-rw-rw-   1     78756 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.27047.log.gz
-rw-rw-rw-   1     80197 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.59789.log.gz
-rw-rw-rw-   1    121178 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.82293.log.gz

3. Copy files incrementally

In real life, the upstream process drops files in some cadence. For instance, new files might get created every hour, or every minute. The downstream process can be configured to pick it up at a different schedule.

Let’s say data lands on S3 and we want to process it on HDFS daily. Copying all files every time doesn’t scale very well. Fortunately, S3DistCp has a built-in solution for that.

For this solution, we use a manifest file. That file allows S3DistCp to keep track of copied files. Following is an example of the command:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest s3://my-tables/processing/hourly_table --srcPattern .*\.log --outputManifest=manifest-2017-02-25.gz --previousManifest=s3://my-tables/processing/hourly_table/manifest-2017-02-24.gz

The command takes two manifest files as parameters, outputManifest and previousManifest. The first one contains a list of all copied files (old and new), and the second contains a list of files copied previously. This way, we can recreate the full history of operations and see what files were copied during each run:

$ hadoop fs -text s3://my-tables/processing/hourly_table/manifest-2017-02-24.gz > previous.lst
$ hadoop fs -text s3://my-tables/processing/hourly_table/manifest-2017-02-25.gz > current.lst
$ diff previous.lst current.lst
2548a2549,2550
> {"path":"s3://my-tables/processing/hourly_table/2017-02-25/00/2017-02-15.00.50958.log","baseName":"2017-02-25/00/2017-02-15.00.50958.log","srcDir":"s3://my-tables/processing/hourly_table","size":610308}
> {"path":"s3://my-tables/processing/hourly_table/2017-02-25/00/2017-02-25.00.93423.log","baseName":"2017-02-25/00/2017-02-25.00.93423.log","srcDir":"s3://my-tables/processing/hourly_table","size":178928}

S3DistCp creates the file in the local file system using the provided path, /tmp/mymanifest.gz. When the copy operation finishes, it moves that manifest to <DESTINATION LOCATION>.

4. Copy multiple folders in one job

Imagine that we need to copy several folders. Usually, we run as many copy jobs as there are folders that need to be copied. With S3DistCp, the copy can be done in one go. All we need is to prepare a file with list of prefixes and use it as a parameter for the tool:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table_filtered --dest s3://my-tables/processing/sample_table --srcPrefixesFile file://${PWD}/folders.lst

In this case, the folders.lst file contains the following prefixes:

$ cat folders.lst
s3://my-tables/incoming/hourly_table_filtered/2017-02-10/11
s3://my-tables/incoming/hourly_table_filtered/2017-02-19/02
s3://my-tables/incoming/hourly_table_filtered/2017-02-23

As a result, the target location has only the requested subfolders:

$ hadoop fs -ls -R s3://my-tables/processing/sample_table
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-10
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-10/11
-rw-rw-rw-   1     139200 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-10/11/2017-02-10.11.12980.log
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-19
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-19/02
-rw-rw-rw-   1     702058 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-19/02/2017-02-19.02.19497.log
-rw-rw-rw-   1     265404 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-19/02/2017-02-19.02.26671.log
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-23
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-23/00
-rw-rw-rw-   1     310425 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-23/00/2017-02-23.00.10061.log
-rw-rw-rw-   1    1030397 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-23/00/2017-02-23.00.22664.log
...

5. Aggregate files based on a pattern

Hadoop is optimized for reading a fewer number of large files rather than many small files, whether from S3 or HDFS. You can use S3DistCp to aggregate small files into fewer large files of a size that you choose, which can optimize your analysis for both performance and cost.

In the following example, we combine small files into bigger files. We do so by using a regular expression with the –groupBy option.

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table --targetSize=10 --groupBy=’.*/hourly_table/.*/(\d\d)/.*\.log’

Let’s take a look into the target folders and compare them to the corresponding source folders:

$ hadoop fs -ls /data/incoming/hourly_table/2017-02-22/05/
Found 8 items
-rw-r--r--   1 hadoop hadoop     289949 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.11125.log
-rw-r--r--   1 hadoop hadoop     407290 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.19596.log
-rw-r--r--   1 hadoop hadoop     253434 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.30135.log
-rw-r--r--   1 hadoop hadoop     590655 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.36531.log
-rw-r--r--   1 hadoop hadoop     762076 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.47822.log
-rw-r--r--   1 hadoop hadoop     489783 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.80518.log
-rw-r--r--   1 hadoop hadoop     205976 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.99127.log
-rw-r--r--   1 hadoop hadoop        800 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/processing.meta

 

$ hadoop fs -ls s3://my-tables/processing/daily_table/2017-02-22/05/
Found 2 items
-rw-rw-rw-   1   10541944 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/054
-rw-rw-rw-   1   10511516 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/055

As you can see, seven data files were combined into two with a size close to the requested 10 MB. The *.meta file was filtered out because --groupBy pattern works in a similar way to –srcPattern. We recommend keeping files larger than the default block size, which is 128 MB on EMR.

The name of the final file is composed of groups in the regular expression used in --groupBy plus some number to make the name unique. The pattern must have at least one group defined.

Let’s consider one more example. This time, we want the file name to be formed from three parts: year, month, and file extension (.log in this case). Here is an updated command:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table_2017 --targetSize=10 --groupBy=’.*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)’

Now we have final files named in a different way:

$ hadoop fs -ls s3://my-tables/processing/daily_table_2017/2017-02-22/05/
Found 2 items
-rw-rw-rw-   1   10541944 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/2017-05log4
-rw-rw-rw-   1   10511516 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/2017-05log5

As you can see, names of final files consist of concatenation of 3 groups from the regular expression (2017-), (\d\d), (log).

You might find that occasionally you get an error that looks like the following:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table_2017 --targetSize=10 --groupBy=’.*/hourly_table/.*(2018-).*/(\d\d)/.*\.(log)’
...
17/04/27 15:37:45 INFO S3DistCp.S3DistCp: Created 0 files to copy 0 files
... 
Exception in thread “main” java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.S3DistCp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.S3DistCp.S3DistCp.run(S3DistCp.java:705)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.S3DistCp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
…

In this case, the key information is contained in Created 0 files to copy 0 files. S3DistCp didn’t find any files to copy because the regular expression in the --groupBy option doesn’t match any files in the source location.

The reason for this issue varies. For example, it can be a mistake in the specified pattern. In the preceding example, we don’t have any files for the year 2018. Another common reason is incorrect escaping of the pattern when we submit S3DistCp command as a step, which is addressed later later in this post.

6. Upload files larger than 1 TB in size

The default upload chunk size when doing an S3 multipart upload is 128 MB. When files are larger than 1 TB, the total number of parts can reach over 10,000. Such a large number of parts can make the job run for a very long time or even fail.

In this case, you can improve job performance by increasing the size of each part. In S3DistCp, you can do this by using the --multipartUploadChunkSize option.

Let’s test how it works on several files about 200 GB in size. With the default part size, it takes about 84 minutes to copy them to S3 from HDFS.

We can increase the default part size to 1000 MB:

$ time s3-dist-cp --src /data/gb200 --dest s3://my-tables/data/S3DistCp/gb200_2 --multipartUploadChunkSize=1000
...
real    41m1.616s

The maximum part size is 5 GB. Keep in mind that larger parts have a higher chance to fail during upload and don’t necessarily speed up the process. Let’s run the same job with the maximum part size:

time s3-dist-cp --src /data/gb200 --dest s3://my-tables/data/S3DistCp/gb200_2 --multipartUploadChunkSize=5000
...
real    40m17.331s

7. Submit a S3DistCp step to an EMR cluster

You can run the S3DistCp tool in several ways. First, you can SSH to the master node and execute the command in a terminal window as we did in the preceding examples. This approach might be convenient for many use cases, but sometimes you might want to create a cluster that has some data on HDFS. You can do this by submitting a step directly in the AWS Management Console when creating a cluster.

In the console add step dialog box, we can fill the fields in the following way:

  • Step type: Custom JAR
  • Name*: S3DistCp Stepli>
  • JAR location: command-runner.jar
  • Arguments: s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest /data/input/hourly_table --targetSize 10 --groupBy .*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)
  • Action of failure: Continue

Notice that we didn’t add quotation marks around our pattern. We needed quotation marks when we were using bash in the terminal window, but not here. The console takes care of escaping and transferring our arguments to the command on the cluster.

Another common use case is to run S3DistCp recurrently or on some event. We can always submit a new step to the existing cluster. The syntax here is slightly different than in previous examples. We separate arguments by commas. In the case of a complex pattern, we shield the whole step option with single quotation marks:

aws emr add-steps --cluster-id j-ABC123456789Z --steps 'Name=LoadData,Jar=command-runner.jar,ActionOnFailure=CONTINUE,Type=CUSTOM_JAR,Args=s3-dist-cp,--src,s3://my-tables/incoming/hourly_table,--dest,/data/input/hourly_table,--targetSize,10,--groupBy,.*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)'

Summary

This post showed you the basics of how S3DistCp works and highlighted some of its most useful features. It covered how you can use S3DistCp to optimize for raw files of different sizes and also selectively copy different files between locations. We also looked at several options for using the tool from SSH, the AWS Management Console, and the AWS CLI.

If you have questions or suggestions, leave a message in the comments.


Next Steps

Take your new knowledge to the next level! Click on the post below and learn the top 10 tips to improve query performance in Amazon Athena.

Top 10 Performance Tuning Tips for Amazon Athena


About the Author

Illya Yalovyy is a Senior Software Development Engineer with Amazon Web Services. He works on cutting-edge features of EMR and is heavily involved in open source projects such as Apache Hive, Apache Zookeeper, Apache Sqoop. His spare time is completely dedicated to his children and family.

 

Building High-Throughput Genomic Batch Workflows on AWS: Batch Layer (Part 3 of 4)

Post Syndicated from Andy Katz original https://aws.amazon.com/blogs/compute/building-high-throughput-genomic-batch-workflows-on-aws-batch-layer-part-3-of-4/

Aaron Friedman is a Healthcare and Life Sciences Partner Solutions Architect at AWS

Angel Pizarro is a Scientific Computing Technical Business Development Manager at AWS

This post is the third in a series on how to build a genomics workflow on AWS. In Part 1, we introduced a general architecture, shown below, and highlighted the three common layers in a batch workflow:

  • Job
  • Batch
  • Workflow

In Part 2, you built a Docker container for each job that needed to run as part of your workflow, and stored them in Amazon ECR.

In Part 3, you tackle the batch layer and build a scalable, elastic, and easily maintainable batch engine using AWS Batch.

AWS Batch enables developers, scientists, and engineers to easily and efficiently run hundreds of thousands of batch computing jobs on AWS. It dynamically provisions the optimal quantity and type of compute resources (for example, CPU or memory optimized instances) based on the volume and specific resource requirements of the batch jobs that you submit. With AWS Batch, you do not need to install and manage your own batch computing software or server clusters, which allows you to focus on analyzing results, such as those of your genomic analysis.

Integrating applications into AWS Batch

If you are new to AWS Batch, we recommend reading Setting Up AWS Batch to ensure that you have the proper permissions and AWS environment.

After you have a working environment, you define several types of resources:

  • IAM roles that provide service permissions
  • A compute environment that launches and terminates compute resources for jobs
  • A custom Amazon Machine Image (AMI)
  • A job queue to submit the units of work and to schedule the appropriate resources within the compute environment to execute those jobs
  • Job definitions that define how to execute an application

After the resources are created, you’ll test the environment and create an AWS Lambda function to send generic jobs to the queue.

This genomics workflow covers the basic steps. For more information, see Getting Started with AWS Batch.

Creating the necessary IAM roles

AWS Batch simplifies batch processing by managing a number of underlying AWS services so that you can focus on your applications. As a result, you create IAM roles that give the service permissions to act on your behalf. In this section, deploy the AWS CloudFormation template included in the GitHub repository and extract the ARNs for later use.

To deploy the stack, go to the top level in the repo with the following command:

aws cloudformation create-stack --template-body file://batch/setup/iam.template.yaml --stack-name iam --capabilities CAPABILITY_NAMED_IAM

You can capture the output from this stack in the Outputs tab in the CloudFormation console:

Creating the compute environment

In AWS Batch, you will set up a managed compute environments. Managed compute environments automatically launch and terminate compute resources on your behalf based on the aggregate resources needed by your jobs, such as vCPU and memory, and simple boundaries that you define.

When defining your compute environment, specify the following:

  • Desired instance types in your environment
  • Min and max vCPUs in the environment
  • The Amazon Machine Image (AMI) to use
  • Percentage value for bids on the Spot Market and VPC subnets that can be used.

AWS Batch then provisions an elastic and heterogeneous pool of Amazon EC2 instances based on the aggregate resource requirements of jobs sitting in the RUNNABLE state. If a mix of CPU and memory-intensive jobs are ready to run, AWS Batch provisions the appropriate ratio and size of CPU and memory-optimized instances within your environment. For this post, you will use the simplest configuration, in which instance types are set to "optimal" allowing AWS Batch to choose from the latest C, M, and R EC2 instance families.

While you could create this compute environment in the console, we provide the following CLI commands. Replace the subnet IDs and key name with your own private subnets and key, and the image-id with the image you will build in the next section.

ACCOUNTID=<your account id>
SERVICEROLE=<from output in CloudFormation template>
IAMFLEETROLE=<from output in CloudFormation template>
JOBROLEARN=<from output in CloudFormation template>
SUBNETS=<comma delimited list of subnets>
SECGROUPS=<your security groups>
SPOTPER=50 # percentage of on demand
IMAGEID=<ami-id corresponding to the one you created>
INSTANCEROLE=<from output in CloudFormation template>
REGISTRY=${ACCOUNTID}.dkr.ecr.us-east-1.amazonaws.com
KEYNAME=<your key name>
MAXCPU=1024 # max vCPUs in compute environment
ENV=myenv

# Creates the compute environment
aws batch create-compute-environment --compute-environment-name genomicsEnv-$ENV --type MANAGED --state ENABLED --service-role ${SERVICEROLE} --compute-resources type=SPOT,minvCpus=0,maxvCpus=$MAXCPU,desiredvCpus=0,instanceTypes=optimal,imageId=$IMAGEID,subnets=$SUBNETS,securityGroupIds=$SECGROUPS,ec2KeyPair=$KEYNAME,instanceRole=$INSTANCEROLE,bidPercentage=$SPOTPER,spotIamFleetRole=$IAMFLEETROLE

Creating the custom AMI for AWS Batch

While you can use default Amazon ECS-optimized AMIs with AWS Batch, you can also provide your own image in managed compute environments. We will use this feature to provision additional scratch EBS storage on each of the instances that AWS Batch launches and also to encrypt both the Docker and scratch EBS volumes.

AWS Batch has the same requirements for your AMI as Amazon ECS. To build the custom image, modify the default Amazon ECS-Optimized Amazon Linux AMI in the following ways:

  • Attach a 1 TB scratch volume to /dev/sdb
  • Encrypt the Docker and new scratch volumes
  • Mount the scratch volume to /docker_scratch by modifying /etcfstab

The first two tasks can be addressed when you create the custom AMI in the console. Spin up a small t2.micro instance, and proceed through the standard EC2 instance launch.

After your instance has launched, record the IP address and then SSH into the instance. Copy and paste the following code:

sudo yum -y update
sudo parted /dev/xvdb mklabel gpt
sudo parted /dev/xvdb mkpart primary 0% 100%
sudo mkfs -t ext4 /dev/xvdb1
sudo mkdir /docker_scratch
sudo echo -e '/dev/xvdb1\t/docker_scratch\text4\tdefaults\t0\t0' | sudo tee -a /etc/fstab
sudo mount -a

This auto-mounts your scratch volume to /docker_scratch, which is your scratch directory for batch processing. Next, create your new AMI and record the image ID.

Creating the job queues

AWS Batch job queues are used to coordinate the submission of batch jobs. Your jobs are submitted to job queues, which can be mapped to one or more compute environments. Job queues have priority relative to each other. You can also specify the order in which they consume resources from your compute environments.

In this solution, use two job queues. The first is for high priority jobs, such as alignment or variant calling. Set this with a high priority (1000) and map back to the previously created compute environment. Next, set a second job queue for low priority jobs, such as quality statistics generation. To create these compute environments, enter the following CLI commands:

aws batch create-job-queue --job-queue-name highPriority-${ENV} --compute-environment-order order=0,computeEnvironment=genomicsEnv-${ENV}  --priority 1000 --state ENABLED
aws batch create-job-queue --job-queue-name lowPriority-${ENV} --compute-environment-order order=0,computeEnvironment=genomicsEnv-${ENV}  --priority 1 --state ENABLED

Creating the job definitions

To run the Isaac aligner container image locally, supply the Amazon S3 locations for the FASTQ input sequences, the reference genome to align to, and the output BAM file. For more information, see tools/isaac/README.md.

The Docker container itself also requires some information on a suitable mountable volume so that it can read and write files temporary files without running out of space.

Note: In the following example, the FASTQ files as well as the reference files to run are in a publicly available bucket.

FASTQ1=s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz
FASTQ2=s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz
REF=s3://aws-batch-genomics-resources/reference/isaac/
BAM=s3://mybucket/genomic-workflow/test_results/bam/

mkdir ~/scratch

docker run --rm -ti -v $(HOME)/scratch:/scratch $REPO_URI --bam_s3_folder_path $BAM \
--fastq1_s3_path $FASTQ1 \
--fastq2_s3_path $FASTQ2 \
--reference_s3_path $REF \
--working_dir /scratch 

Locally running containers can typically expand their CPU and memory resource headroom. In AWS Batch, the CPU and memory requirements are hard limits and are allocated to the container image at runtime.

Isaac is a fairly resource-intensive algorithm, as it creates an uncompressed index of the reference genome in memory to match the query DNA sequences. The large memory space is shared across multiple CPU threads, and Isaac can scale almost linearly with the number of CPU threads given to it as a parameter.

To fit these characteristics, choose an optimal instance size to maximize the number of CPU threads based on a given large memory footprint, and deploy a Docker container that uses all of the instance resources. In this case, we chose a host instance with 80+ GB of memory and 32+ vCPUs. The following code is example JSON that you can pass to the AWS CLI to create a job definition for Isaac.

aws batch register-job-definition --job-definition-name isaac-${ENV} --type container --retry-strategy attempts=3 --container-properties '
{"image": "'${REGISTRY}'/isaac",
"jobRoleArn":"'${JOBROLEARN}'",
"memory":80000,
"vcpus":32,
"mountPoints": [{"containerPath": "/scratch", "readOnly": false, "sourceVolume": "docker_scratch"}],
"volumes": [{"name": "docker_scratch", "host": {"sourcePath": "/docker_scratch"}}]
}'

You can copy and paste the following code for the other three job definitions:

aws batch register-job-definition --job-definition-name strelka-${ENV} --type container --retry-strategy attempts=3 --container-properties '
{"image": "'${REGISTRY}'/strelka",
"jobRoleArn":"'${JOBROLEARN}'",
"memory":32000,
"vcpus":32,
"mountPoints": [{"containerPath": "/scratch", "readOnly": false, "sourceVolume": "docker_scratch"}],
"volumes": [{"name": "docker_scratch", "host": {"sourcePath": "/docker_scratch"}}]
}'

aws batch register-job-definition --job-definition-name snpeff-${ENV} --type container --retry-strategy attempts=3 --container-properties '
{"image": "'${REGISTRY}'/snpeff",
"jobRoleArn":"'${JOBROLEARN}'",
"memory":10000,
"vcpus":4,
"mountPoints": [{"containerPath": "/scratch", "readOnly": false, "sourceVolume": "docker_scratch"}],
"volumes": [{"name": "docker_scratch", "host": {"sourcePath": "/docker_scratch"}}]
}'

aws batch register-job-definition --job-definition-name samtoolsStats-${ENV} --type container --retry-strategy attempts=3 --container-properties '
{"image": "'${REGISTRY}'/samtools_stats",
"jobRoleArn":"'${JOBROLEARN}'",
"memory":10000,
"vcpus":4,
"mountPoints": [{"containerPath": "/scratch", "readOnly": false, "sourceVolume": "docker_scratch"}],
"volumes": [{"name": "docker_scratch", "host": {"sourcePath": "/docker_scratch"}}]
}'

The value for "image" comes from the previous post on creating a Docker image and publishing to ECR. The value for jobRoleArn you can find from the output of the CloudFormation template that you deployed earlier. In addition to providing the number of CPU cores and memory required by Isaac, you also give it a storage volume for scratch and staging. The volume comes from the previously defined custom AMI.

Testing the environment

After you have created the Isaac job definition, you can submit the job using the AWS Batch submitJob API action. While the base mappings for Docker run are taken care of in the job definition that you just built, the specific job parameters should be specified in the container overrides section of the API call. Here’s what this would look like in the CLI, using the same parameters as in the bash commands shown earlier:

aws batch submit-job --job-name testisaac --job-queue highPriority-${ENV} --job-definition isaac-${ENV}:1 --container-overrides '{
"command": [
			"--bam_s3_folder_path", "s3://mybucket/genomic-workflow/test_batch/bam/",
            "--fastq1_s3_path", "s3://aws-batch-genomics-resources/fastq/ SRR1919605_1.fastq.gz",
            "--fastq2_s3_path", "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz",
            "--reference_s3_path", "s3://aws-batch-genomics-resources/reference/isaac/",
            "--working_dir", "/scratch",
			"—cmd_args", " --exome ",]
}'

When you execute a submitJob call, jobId is returned. You can then track the progress of your job using the describeJobs API action:

aws batch describe-jobs –jobs <jobId returned from submitJob>

You can also track the progress of all of your jobs in the AWS Batch console dashboard.

To see exactly where a RUNNING job is at, use the link in the AWS Batch console to direct you to the appropriate location in CloudWatch logs.

Completing the batch environment setup

To finish, create a Lambda function to submit a generic AWS Batch job.

In the Lambda console, create a Python 2.7 Lambda function named batchSubmitJob. Copy and paste the following code. This is similar to the batch-submit-job-python27 Lambda blueprint. Use the LambdaBatchExecutionRole that you created earlier. For more information about creating functions, see Step 2.1: Create a Hello World Lambda Function.

from __future__ import print_function

import json
import boto3

batch_client = boto3.client('batch')

def lambda_handler(event, context):
    # Log the received event
    print("Received event: " + json.dumps(event, indent=2))
    # Get parameters for the SubmitJob call
    # http://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html
    job_name = event['jobName']
    job_queue = event['jobQueue']
    job_definition = event['jobDefinition']
    
    # containerOverrides, dependsOn, and parameters are optional
    container_overrides = event['containerOverrides'] if event.get('containerOverrides') else {}
    parameters = event['parameters'] if event.get('parameters') else {}
    depends_on = event['dependsOn'] if event.get('dependsOn') else []
    
    try:
        response = batch_client.submit_job(
            dependsOn=depends_on,
            containerOverrides=container_overrides,
            jobDefinition=job_definition,
            jobName=job_name,
            jobQueue=job_queue,
            parameters=parameters
        )
        
        # Log response from AWS Batch
        print("Response: " + json.dumps(response, indent=2))
        
        # Return the jobId
        event['jobId'] = response['jobId']
        return event
    
    except Exception as e:
        print(e)
        message = 'Error getting Batch Job status'
        print(message)
        raise Exception(message)

Conclusion

In part 3 of this series, you successfully set up your data processing, or batch, environment in AWS Batch. We also provided a Python script in the corresponding GitHub repo that takes care of all of the above CLI arguments for you, as well as building out the job definitions for all of the jobs in the workflow: Isaac, Strelka, SAMtools, and snpEff. You can check the script’s README for additional documentation.

In Part 4, you’ll cover the workflow layer using AWS Step Functions and AWS Lambda.

Please leave any questions and comments below.

AWS Online Tech Talks – June 2017

Post Syndicated from Tara Walker original https://aws.amazon.com/blogs/aws/aws-online-tech-talks-june-2017/

As the sixth month of the year, June is significant in that it is not only my birth month (very special), but it contains the summer solstice in the Northern Hemisphere, the day with the most daylight hours, and the winter solstice in the Southern Hemisphere, the day with the fewest daylight hours. In the United States, June is also the month in which we celebrate our dads with Father’s Day and have month-long celebrations of music, heritage, and the great outdoors.

Therefore, the month of June can be filled with lots of excitement. So why not add even more delight to the month, by enhancing your cloud computing skills. This month’s AWS Online Tech Talks features sessions on Artificial Intelligence (AI), Storage, Big Data, and Compute among other great topics.

June 2017 – Schedule

Noted below are the upcoming scheduled live, online technical sessions being held during the month of June. Make sure to register ahead of time so you won’t miss out on these free talks conducted by AWS subject matter experts. All schedule times for the online tech talks are shown in the Pacific Time (PDT) time zone.

Webinars featured this month are:

Thursday, June 1

Storage

9:00 AM – 10:00 AM: Deep Dive on Amazon Elastic File System

Big Data

10:30 AM – 11:30 AM: Migrating Big Data Workloads to Amazon EMR

Serverless

12:00 Noon – 1:00 PM: Building AWS Lambda Applications with the AWS Serverless Application Model (AWS SAM)

 

Monday, June 5

Artificial Intelligence

9:00 AM – 9:40 AM: Exploring the Business Use Cases for Amazon Lex

 

Tuesday, June 6

Management Tools

9:00 AM – 9:40 AM: Automated Compliance and Governance with AWS Config and AWS CloudTrail

 

Wednesday, June 7

Storage

9:00 AM – 9:40 AM: Backing up Amazon EC2 with Amazon EBS Snapshots

Big Data

10:30 AM – 11:10 AM: Intro to Amazon Redshift Spectrum: Quickly Query Exabytes of Data in S3

DevOps

12:00 Noon – 12:40 PM: Introduction to AWS CodeStar: Quickly Develop, Build, and Deploy Applications on AWS

 

Thursday, June 8

Artificial Intelligence

9:00 AM – 9:40 AM: Exploring the Business Use Cases for Amazon Polly

10:30 AM – 11:10 AM: Exploring the Business Use Cases for Amazon Rekognition

 

Monday, June 12

Artificial Intelligence

9:00 AM – 9:40 AM: Exploring the Business Use Cases for Amazon Machine Learning

 

Tuesday, June 13

Compute

9:00 AM – 9:40 AM: DevOps with Visual Studio, .NET and AWS

IoT

10:30 AM – 11:10 AM: Create, with Intel, an IoT Gateway and Establish a Data Pipeline to AWS IoT

Big Data

12:00 Noon – 12:40 PM: Real-Time Log Analytics using Amazon Kinesis and Amazon Elasticsearch Service

 

Wednesday, June 14

Containers

9:00 AM – 9:40 AM: Batch Processing with Containers on AWS

Security & Identity

12:00 Noon – 12:40 PM: Using Microsoft Active Directory across On-premises and Cloud Workloads

 

Thursday, June 15

Big Data

12:00 Noon – 1:00 PM: Building Big Data Applications with Serverless Architectures

 

Monday, June 19

Artificial Intelligence

9:00 AM – 9:40 AM: Deep Learning for Data Scientists: Using Apache MxNet and R on AWS

 

Tuesday, June 20

Storage

9:00 AM – 9:40 AM: Cloud Backup & Recovery Options with AWS Partner Solutions

Artificial Intelligence

10:30 AM – 11:10 AM: An Overview of AI on the AWS Platform

 

The AWS Online Tech Talks series covers a broad range of topics at varying technical levels. These sessions feature live demonstrations & customer examples led by AWS engineers and Solution Architects. Check out the AWS YouTube channel for more on-demand webinars on AWS technologies.

Tara

Build a Serverless Architecture to Analyze Amazon CloudFront Access Logs Using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

Post Syndicated from Rajeev Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-serverless-architecture-to-analyze-amazon-cloudfront-access-logs-using-aws-lambda-amazon-athena-and-amazon-kinesis-analytics/

Nowadays, it’s common for a web server to be fronted by a global content delivery service, like Amazon CloudFront. This type of front end accelerates delivery of websites, APIs, media content, and other web assets to provide a better experience to users across the globe.

The insights gained by analysis of Amazon CloudFront access logs helps improve website availability through bot detection and mitigation, optimizing web content based on the devices and browser used to view your webpages, reducing perceived latency by caching of popular object closer to its viewer, and so on. This results in a significant improvement in the overall perceived experience for the user.

This blog post provides a way to build a serverless architecture to generate some of these insights. To do so, we analyze Amazon CloudFront access logs both at rest and in transit through the stream. This serverless architecture uses Amazon Athena to analyze large volumes of CloudFront access logs (on the scale of terabytes per day), and Amazon Kinesis Analytics for streaming analysis.

The analytic queries in this blog post focus on three common use cases:

  1. Detection of common bots using the user agent string
  2. Calculation of current bandwidth usage per Amazon CloudFront distribution per edge location
  3. Determination of the current top 50 viewers

However, you can easily extend the architecture described to power dashboards for monitoring, reporting, and trigger alarms based on deeper insights gained by processing and analyzing the logs. Some examples are dashboards for cache performance, usage and viewer patterns, and so on.

Following we show a diagram of this architecture.

Prerequisites

Before you set up this architecture, install the AWS Command Line Interface (AWS CLI) tool on your local machine, if you don’t have it already.

Setup summary

The following steps are involved in setting up the serverless architecture on the AWS platform:

  1. Create an Amazon S3 bucket for your Amazon CloudFront access logs to be delivered to and stored in.
  2. Create a second Amazon S3 bucket to receive processed logs and store the partitioned data for interactive analysis.
  3. Create an Amazon Kinesis Firehose delivery stream to batch, compress, and deliver the preprocessed logs for analysis.
  4. Create an AWS Lambda function to preprocess the logs for analysis.
  5. Configure Amazon S3 event notification on the CloudFront access logs bucket, which contains the raw logs, to trigger the Lambda preprocessing function.
  6. Create an Amazon DynamoDB table to look up partition details, such as partition specification and partition location.
  7. Create an Amazon Athena table for interactive analysis.
  8. Create a second AWS Lambda function to add new partitions to the Athena table based on the log delivered to the processed logs bucket.
  9. Configure Amazon S3 event notification on the processed logs bucket to trigger the Lambda partitioning function.
  10. Configure Amazon Kinesis Analytics application for analysis of the logs directly from the stream.

ETL and preprocessing

In this section, we parse the CloudFront access logs as they are delivered, which occurs multiple times in an hour. We filter out commented records and use the user agent string to decipher the browser name, the name of the operating system, and whether the request has been made by a bot. For more details on how to decipher the preceding information based on the user agent string, see user-agents 1.1.0 in the Python documentation.

We use the Lambda preprocessing function to perform these tasks on individual rows of the access log. On successful completion, the rows are pushed to an Amazon Kinesis Firehose delivery stream to be persistently stored in an Amazon S3 bucket, the processed logs bucket.

To create a Firehose delivery stream with a new or existing S3 bucket as the destination, follow the steps described in Create a Firehose Delivery Stream to Amazon S3 in the S3 documentation. Keep most of the default settings, but select an AWS Identity and Access Management (IAM) role that has write access to your S3 bucket and specify GZIP compression. Name the delivery stream CloudFrontLogsToS3.

Another pre-requisite for this setup is to create an IAM role that provides the necessary permissions our AWS Lambda function to get the data from S3, process it, and deliver it to the CloudFrontLogsToS3 delivery stream.

Let’s use the AWS CLI to create the IAM role using the following the steps:

  1. Create the IAM policy (lambda-exec-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (lambda-cflogs-exec-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To download the policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/lambda-exec-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/assume-lambda-policy.json  <path_on_your_local_machine>

Following is the lambda-exec-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchAccess",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Sid": "S3Access",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        },
        {
            "Sid": "FirehoseAccess",
            "Effect": "Allow",
            "Action": [
                "firehose:ListDeliveryStreams",
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3"
            ]
        }
    ]
}

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-exec-policy --policy-document file://<path>/lambda-exec-policy.json

To create the AWS Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Following is the assume-lambda-policy.json file, to grant Lambda permission to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To attach the policy (lambda-exec-policy) created to the AWS Lambda execution role (lambda-cflogs-exec-role), type the following command.

aws iam attach-role-policy --role-name lambda-cflogs-exec-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-exec-policy

Now that we have created the CloudFrontLogsToS3 Firehose delivery stream and the lambda-cflogs-exec-role IAM role for Lambda, the next step is to create a Lambda preprocessing function.

This Lambda preprocessing function parses the CloudFront access logs delivered into the S3 bucket and performs a few transformation and mapping operations on the data. The Lambda function adds descriptive information, such as the browser and the operating system that were used to make this request based on the user agent string found in the logs. The Lambda function also adds information about the web distribution to support scenarios where CloudFront access logs are delivered to a centralized S3 bucket from multiple distributions. With the solution in this blog post, you can get insights across distributions and their edge locations.

Use the Lambda Management Console to create a new Lambda function with a Python 2.7 runtime and the s3-get-object-python blueprint. Open the console, and on the Configure triggers page, choose the name of the S3 bucket where the CloudFront access logs are delivered. Choose Put for Event type. For Prefix, type the name of the prefix, if any, for the folder where CloudFront access logs are delivered, for example cloudfront-logs/. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable trigger.

Choose Next and provide a function name to identify this Lambda preprocessing function.

For Code entry type, choose Upload a file from Amazon S3. For S3 link URL, type https.amazonaws.com//preprocessing-lambda/pre-data.zip. In the section, also create an environment variable with the key KINESIS_FIREHOSE_STREAM and a value with the name of the Firehose delivery stream as CloudFrontLogsToS3.

Choose lambda-cflogs-exec-role as the IAM role for the Lambda function, and type prep-data.lambda_handler for the value for Handler.

Choose Next, and then choose Create Lambda.

Table creation in Amazon Athena

In this step, we will build the Athena table. Use the Athena console in the same region and create the table using the query editor.

CREATE EXTERNAL TABLE IF NOT EXISTS cf_logs (
  logdate date,
  logtime string,
  location string,
  bytes bigint,
  requestip string,
  method string,
  host string,
  uri string,
  status bigint,
  referrer string,
  useragent string,
  uriquery string,
  cookie string,
  resulttype string,
  requestid string,
  header string,
  csprotocol string,
  csbytes string,
  timetaken bigint,
  forwardedfor string,
  sslprotocol string,
  sslcipher string,
  responseresulttype string,
  protocolversion string,
  browserfamily string,
  osfamily string,
  isbot string,
  filename string,
  distribution string
)
PARTITIONED BY(year string, month string, day string, hour string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION 's3://<pre-processing-log-bucket>/prefix/';

Creation of the Athena partition

A popular website with millions of requests each day routed using Amazon CloudFront can generate a large volume of logs, on the order of a few terabytes a day. We strongly recommend that you partition your data to effectively restrict the amount of data scanned by each query. Partitioning significantly improves query performance and substantially reduces cost. The Lambda partitioning function adds the partition information to the Athena table for the data delivered to the preprocessed logs bucket.

Before delivering the preprocessed Amazon CloudFront logs file into the preprocessed logs bucket, Amazon Kinesis Firehose adds a UTC time prefix in the format YYYY/MM/DD/HH. This approach supports multilevel partitioning of the data by year, month, date, and hour. You can invoke the Lambda partitioning function every time a new processed Amazon CloudFront log is delivered to the preprocessed logs bucket. To do so, configure the Lambda partitioning function to be triggered by an S3 Put event.

For a website with millions of requests, a large number of preprocessed logs can be delivered multiple times in an hour—for example, at the interval of one each second. To avoid querying the Athena table for partition information every time a preprocessed log file is delivered, you can create an Amazon DynamoDB table for fast lookup.

Based on the year, month, data and hour in the prefix of the delivered log, the Lambda partitioning function checks if the partition specification exists in the Amazon DynamoDB table. If it doesn’t, it’s added to the table using an atomic operation, and then the Athena table is updated.

Type the following command to create the Amazon DynamoDB table.

aws dynamodb create-table --table-name athenapartitiondetails \
--attribute-definitions AttributeName=PartitionSpec,AttributeType=S \
--key-schema AttributeName=PartitionSpec,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=100,WriteCapacityUnits=100

Here the following is true:

  • PartitionSpec is the hash key and is a representation of the partition signature—for example, year=”2017”; month=”05”; day=”15”; hour=”10”.
  • Depending on the rate at which the processed log files are delivered to the processed log bucket, you might have to increase the ReadCapacityUnits and WriteCapacityUnits values, if these are throttled.

The other attributes besides PartitionSpec are the following:

  • PartitionPath – The S3 path associated with the partition.
  • PartitionType – The type of partition used (Hour, Month, Date, Year, or ALL). In this case, ALL is used.

Next step is to create the IAM role to provide permissions for the Lambda partitioning function. You require permissions to do the following:

  1. Look up and write partition information to DynamoDB.
  2. Alter the Athena table with new partition information.
  3. Perform Amazon CloudWatch logs operations.
  4. Perform Amazon S3 operations.

To download the policy document to your local machine, type following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/lambda-partition-function-execution-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/assume-lambda-policy.json <path_on_your_local_machine>

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(lambda-partition-exec-policy) used by the Lambda execution role.
  2. Create the Lambda execution role (lambda-partition-execution-role)and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-partition-exec-policy --policy-document file://<path>/lambda-partition-function-execution-policy.json

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-partition-execution-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

To attach the policy (lambda-partition-exec-policy) created to the AWS Lambda execution role (lambda-partition-execution-role), type the following command.

aws iam attach-role-policy --role-name lambda-partition-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-partition-exec-policy

Following is the lambda-partition-function-execution-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
            	"Sid": "DDBTableAccess",
            	"Effect": "Allow",
            	"Action": "dynamodb:PutItem"
            	"Resource": "arn:aws:dynamodb*:*:table/athenapartitiondetails"
        	},
        	{
            	"Sid": "S3Access",
            	"Effect": "Allow",
            	"Action": [
                		"s3:GetBucketLocation",
                		"s3:GetObject",
                		"s3:ListBucket",
                		"s3:ListBucketMultipartUploads",
                		"s3:ListMultipartUploadParts",
                		"s3:AbortMultipartUpload",
                		"s3:PutObject"
            	],
          		"Resource":"arn:aws:s3:::*"
		},
	              {
		      "Sid": "AthenaAccess",
      		"Effect": "Allow",
      		"Action": [ "athena:*" ],
      		"Resource": [ "*" ]
	      },
        	{
            	"Sid": "CloudWatchLogsAccess",
            	"Effect": "Allow",
            	"Action": [
                		"logs:CreateLogGroup",
                		"logs:CreateLogStream",
             	   	"logs:PutLogEvents"
            	],
            	"Resource": "arn:aws:logs:*:*:*"
        	}
    ]
}

Download the .jar file containing the Java deployment package to your local machine.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/aws-lambda-athena-1.0.0.jar <path_on_your_local_machine>

From the AWS Management Console, create a new Lambda function with Java8 as the runtime. Select the Blank Function blueprint.

On the Configure triggers page, choose the name of the S3 bucket where the preprocessed logs are delivered. Choose Put for the Event Type. For Prefix, type the name of the prefix folder, if any, where preprocessed logs are delivered by Firehose—for example, out/. For Suffix, type the name of the compression format that the Firehose stream (CloudFrontLogToS3) delivers the preprocessed logs —for example, gz. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable Trigger.

Choose Next and provide a function name to identify this Lambda partitioning function.

Choose Java8 for Runtime for the AWS Lambda function. Choose Upload a .ZIP or .JAR file for the Code entry type, and choose Upload to upload the downloaded aws-lambda-athena-1.0.0.jar file.

Next, create the following environment variables for the Lambda function:

  • TABLE_NAME – The name of the Athena table (for example, cf_logs).
  • PARTITION_TYPE – The partition to be created based on the Athena table for the logs delivered to the sub folders in S3 bucket based on Year, Month, Date, Hour, or Set this to ALL to use Year, Month, Date, and Hour.
  • DDB_TABLE_NAME – The name of the DynamoDB table holding partition information (for example, athenapartitiondetails).
  • ATHENA_REGION – The current AWS Region for the Athena table to construct the JDBC connection string.
  • S3_STAGING_DIR – The Amazon S3 location where your query output is written. The JDBC driver asks Athena to read the results and provide rows of data back to the user (for example, s3://<bucketname>/<folder>/).

To configure the function handler and IAM, for Handler copy and paste the name of the handler: com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3EventWithDDB::handleRequest. Choose the existing IAM role, lambda-partition-execution-role.

Choose Next and then Create Lambda.

Interactive analysis using Amazon Athena

In this section, we analyze the historical data that’s been collected since we added the partitions to the Amazon Athena table for data delivered to the preprocessing logs bucket.

Scenario 1 is robot traffic by edge location.

SELECT COUNT(*) AS ct, requestip, location FROM cf_logs
WHERE isbot='True'
GROUP BY requestip, location
ORDER BY ct DESC;

Scenario 2 is total bytes transferred per distribution for each edge location for your website.

SELECT distribution, location, SUM(bytes) as totalBytes
FROM cf_logs
GROUP BY location, distribution;

Scenario 3 is the top 50 viewers of your website.

SELECT requestip, COUNT(*) AS ct  FROM cf_logs
GROUP BY requestip
ORDER BY ct DESC;

Streaming analysis using Amazon Kinesis Analytics

In this section, you deploy a stream processing application using Amazon Kinesis Analytics to analyze the preprocessed Amazon CloudFront log streams. This application analyzes directly from the Amazon Kinesis Stream as it is delivered to the preprocessing logs bucket. The stream queries in section are focused on gaining the following insights:

  • The IP address of the bot, identified by its Amazon CloudFront edge location, that is currently sending requests to your website. The query also includes the total bytes transferred as part of the response.
  • The total bytes served per distribution per population for your website.
  • The top 10 viewers of your website.

To download the firehose-access-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/firehose-access-policy.json  <path_on_your_local_machine>

To download the kinesisanalytics-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/assume-kinesisanalytics-policy.json <path_on_your_local_machine>

Before we create the Amazon Kinesis Analytics application, we need to create the IAM role to provide permission for the analytics application to access Amazon Kinesis Firehose stream.

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(firehose-access-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (ka-execution-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

Following is the firehose-access-policy.json file, which is the IAM policy used by Kinesis Analytics to read Firehose delivery stream.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
    	"Sid": "AmazonFirehoseAccess",
    	"Effect": "Allow",
    	"Action": [
       	"firehose:DescribeDeliveryStream",
        	"firehose:Get*"
    	],
    	"Resource": [
              "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3”
       ]
     }
}

Following is the assume-kinesisanalytics-policy.json file, to grant Amazon Kinesis Analytics permissions to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesisanalytics.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To create the IAM policy used by Analytics access role, type the following command.

aws iam create-policy --policy-name firehose-access-policy --policy-document file://<path>/firehose-access-policy.json

To create the Analytics execution role and assign the service to use this role, type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To attach the policy (irehose-access-policy) created to the Analytics execution role (ka-execution-role), type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To deploy the Analytics application, first download the configuration file and then modify ResourceARN and RoleARN for the Amazon Kinesis Firehose input configuration.

"KinesisFirehoseInput": { 
    "ResourceARN": "arn:aws:firehose:<region>:<account-id>:deliverystream/CloudFrontLogsToS3", 
    "RoleARN": "arn:aws:iam:<account-id>:role/ka-execution-role"
}

To download the Analytics application configuration file, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis//kinesisanalytics/kinesis-analytics-app-configuration.json <path_on_your_local_machine>

To deploy the application, type the following command.

aws kinesisanalytics create-application --application-name "cf-log-analysis" --cli-input-json file://<path>/kinesis-analytics-app-configuration.json

To start the application, type the following command.

aws kinesisanalytics start-application --application-name "cf-log-analysis" --input-configuration Id="1.1",InputStartingPositionConfiguration={InputStartingPosition="NOW"}

SQL queries using Amazon Kinesis Analytics

Scenario 1 is a query for detecting bots for sending request to your website detection for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BOT_DETECTION" (requesttime TIME, destribution VARCHAR(16), requestip VARCHAR(64), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BOT_DETECTION_PUMP" AS INSERT INTO "BOT_DETECTION"
--
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "request_ip" as requestip, 
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
WHERE "is_bot" = true
GROUP BY "request_ip", "edge_location", "distribution_name",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 2 is a query for total bytes transferred per distribution for each edge location for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BYTES_TRANSFFERED" (requesttime TIME, destribution VARCHAR(16), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BYTES_TRANSFFERED_PUMP" AS INSERT INTO "BYTES_TRANSFFERED"
-- Bytes Transffered per second per web destribution by edge location
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
GROUP BY "distribution_name", "edge_location", "request_date",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 3 is a query for the top 50 viewers for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "TOP_TALKERS" (requestip VARCHAR(64), requestcount DOUBLE);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "TOP_TALKERS_PUMP" AS INSERT INTO "TOP_TALKERS"
-- Top Ten Talker
SELECT STREAM ITEM as requestip, ITEM_COUNT as requestcount FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "CF_LOG_STREAM_001"),
  'request_ip', -- name of column in single quotes
  50, -- number of top items
  60 -- tumbling window size in seconds
  )
);

Conclusion

Following the steps in this blog post, you just built an end-to-end serverless architecture to analyze Amazon CloudFront access logs. You analyzed these both in interactive and streaming mode, using Amazon Athena and Amazon Kinesis Analytics respectively.

By creating a partition in Athena for the logs delivered to a centralized bucket, this architecture is optimized for performance and cost when analyzing large volumes of logs for popular websites that receive millions of requests. Here, we have focused on just three common use cases for analysis, sharing the analytic queries as part of the post. However, you can extend this architecture to gain deeper insights and generate usage reports to reduce latency and increase availability. This way, you can provide a better experience on your websites fronted with Amazon CloudFront.

In this blog post, we focused on building serverless architecture to analyze Amazon CloudFront access logs. Our plan is to extend the solution to provide rich visualization as part of our next blog post.


About the Authors

Rajeev Srinivasan is a Senior Solution Architect for AWS. He works very close with our customers to provide big data and NoSQL solution leveraging the AWS platform and enjoys coding . In his spare time he enjoys riding his motorcycle and reading books.

 

Sai Sriparasa is a consultant with AWS Professional Services. He works with our customers to provide strategic and tactical big data solutions with an emphasis on automation, operations & security on AWS. In his spare time, he follows sports and current affairs.

 

 


Related

Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight