Tag Archives: S3DistCp

Synchronizing Amazon S3 Buckets Using AWS Step Functions

Post Syndicated from Andy Katz original https://aws.amazon.com/blogs/compute/synchronizing-amazon-s3-buckets-using-aws-step-functions/

Constantin Gonzalez is a Principal Solutions Architect at AWS

In my free time, I run a small blog that uses Amazon S3 to host static content and Amazon CloudFront to distribute it world-wide. I use a home-grown, static website generator to create and upload my blog content onto S3.

My blog uses two S3 buckets: one for staging and testing, and one for production. As a website owner, I want to update the production bucket with all changes from the staging bucket in a reliable and efficient way, without having to create and populate a new bucket from scratch. Therefore, to synchronize files between these two buckets, I use AWS Lambda and AWS Step Functions.

In this post, I show how you can use Step Functions to build a scalable synchronization engine for S3 buckets and learn some common patterns for designing Step Functions state machines while you do so.

Step Functions overview

Step Functions makes it easy to coordinate the components of distributed applications and microservices using visual workflows. Building applications from individual components that each perform a discrete function lets you scale and change applications quickly.

While this particular example focuses on synchronizing objects between two S3 buckets, it can be generalized to any other use case that involves coordinated processing of any number of objects in S3 buckets, or other, similar data processing patterns.

Bucket replication options

Before I dive into the details on how this particular example works, take a look at some alternatives for copying or replicating data between two Amazon S3 buckets:

  • The AWS CLI provides customers with a powerful aws s3 sync command that can synchronize the contents of one bucket with another.
  • S3DistCP is a powerful tool for users of Amazon EMR that can efficiently load, save, or copy large amounts of data between S3 buckets and HDFS.
  • The S3 cross-region replication functionality enables automatic, asynchronous copying of objects across buckets in different AWS regions.

In this use case, you are looking for a slightly different bucket synchronization solution that:

  • Works within the same region
  • Is more scalable than a CLI approach running on a single machine
  • Doesn’t require managing any servers
  • Uses a more finely grained cost model than the hourly based Amazon EMR approach

You need a scalable, serverless, and customizable bucket synchronization utility.

Solution architecture

Your solution needs to do three things:

  1. Copy all objects from a source bucket into a destination bucket, but leave out objects that are already present, for efficiency.
  2. Delete all "orphaned" objects from the destination bucket that aren’t present on the source bucket, because you don’t want obsolete objects lying around.
  3. Keep track of all objects for #1 and #2, regardless of how many objects there are.

In the beginning, you read in the source and destination buckets as parameters and perform basic parameter validation. Then, you operate two separate, independent loops, one for copying missing objects and one for deleting obsolete objects. Each loop is a sequence of Step Functions states that read in chunks of S3 object lists and use the continuation token to decide in a choice state whether to continue the loop or not.

This solution is based on the following architecture that uses Step Functions, Lambda, and two S3 buckets:

As you can see, this setup involves no servers, just two main building blocks:

  • Step Functions manages the overall flow of synchronizing the objects from the source bucket with the destination bucket.
  • A set of Lambda functions carry out the individual steps necessary to perform the work, such as validating input, getting lists of objects from source and destination buckets, copying or deleting objects in batches, and so on.

To understand the synchronization flow in more detail, look at the Step Functions state machine diagram for this example.

Walkthrough

Here’s a detailed discussion of how this works.

To follow along, use the code in the sync-buckets-state-machine GitHub repo. The code comes with a ready-to-run deployment script in Python that takes care of all the IAM roles, policies, Lambda functions, and of course the Step Functions state machine deployment using AWS CloudFormation, as well as instructions on how to use it.

Fine print: Use at your own risk

Before I start, here are some disclaimers:

  • Educational purposes only.

    The following example and code are intended for educational purposes only. Make sure that you customize, test, and review it on your own before using any of this in production.

  • S3 object deletion.

    In particular, using the code included below may delete objects on S3 in order to perform synchronization. Make sure that you have backups of your data. In particular, consider using the Amazon S3 Versioning feature to protect yourself against unintended data modification or deletion.

Step Functions execution starts with an initial set of parameters that contain the source and destination bucket names in JSON:

{
    "source":       "my-source-bucket-name",
    "destination":  "my-destination-bucket-name"
}

Armed with this data, Step Functions execution proceeds as follows.

Step 1: Detect the bucket region

First, you need to know the regions where your buckets reside. In this case, take advantage of the Step Functions Parallel state. This allows you to use a Lambda function get_bucket_location.py inside two different, parallel branches of task states:

  • FindRegionForSourceBucket
  • FindRegionForDestinationBucket

Each task state receives one bucket name as an input parameter, then detects the region corresponding to "their" bucket. The output of these functions is collected in a result array containing one element per parallel function.

Step 2: Combine the parallel states

The output of a parallel state is a list with all the individual branches’ outputs. To combine them into a single structure, use a Lambda function called combine_dicts.py in its own CombineRegionOutputs task state. The function combines the two outputs from step 1 into a single JSON dict that provides you with the necessary region information for each bucket.

Step 3: Validate the input

In this walkthrough, you only support buckets that reside in the same region, so you need to decide if the input is valid or if the user has given you two buckets in different regions. To find out, use a Lambda function called validate_input.py in the ValidateInput task state that tests if the two regions from the previous step are equal. The output is a Boolean.

Step 4: Branch the workflow

Use another type of Step Functions state, a Choice state, which branches into a Failure state if the comparison in step 3 yields false, or proceeds with the remaining steps if the comparison was successful.

Step 5: Execute in parallel

The actual work is happening in another Parallel state. Both branches of this state are very similar to each other and they re-use some of the Lambda function code.

Each parallel branch implements a looping pattern across the following steps:

  1. Use a Pass state to inject either the string value "source" (InjectSourceBucket) or "destination" (InjectDestinationBucket) into the listBucket attribute of the state document.

    The next step uses either the source or the destination bucket, depending on the branch, while executing the same, generic Lambda function. You don’t need two Lambda functions that differ only slightly. This step illustrates how to use Pass states as a way of injecting constant parameters into your state machine and as a way of controlling step behavior while re-using common step execution code.

  2. The next step UpdateSourceKeyList/UpdateDestinationKeyList lists objects in the given bucket.

    Remember that the previous step injected either "source" or "destination" into the state document’s listBucket attribute. This step uses the same list_bucket.py Lambda function to list objects in an S3 bucket. The listBucket attribute of its input decides which bucket to list. In the left branch of the main parallel state, use the list of source objects to work through copying missing objects. The right branch uses the list of destination objects, to check if they have a corresponding object in the source bucket and eliminate any orphaned objects. Orphans don’t have a source object of the same S3 key.

  3. This step performs the actual work. In the left branch, the CopySourceKeys step uses the copy_keys.py Lambda function to go through the list of source objects provided by the previous step, then copies any missing object into the destination bucket. Its sister step in the other branch, DeleteOrphanedKeys, uses its destination bucket key list to test whether each object from the destination bucket has a corresponding source object, then deletes any orphaned objects.

  4. The S3 ListObjects API action is designed to be scalable across many objects in a bucket. Therefore, it returns object lists in chunks of configurable size, along with a continuation token. If the API result has a continuation token, it means that there are more objects in this list. You can work from token to token to continue getting object list chunks, until you get no more continuation tokens.

By breaking down large amounts of work into chunks, you can make sure each chunk is completed within the timeframe allocated for the Lambda function, and within the maximum input/output data size for a Step Functions state.

This approach comes with a slight tradeoff: the more objects you process at one time in a given chunk, the faster you are done. There’s less overhead for managing individual chunks. On the other hand, if you process too many objects within the same chunk, you risk going over time and space limits of the processing Lambda function or the Step Functions state so the work cannot be completed.

In this particular case, use a Lambda function that maximizes the number of objects listed from the S3 bucket that can be stored in the input/output state data. This is currently up to 32,768 bytes, assuming (based on some experimentation) that the execution of the COPY/DELETE requests in the processing states can always complete in time.

A more sophisticated approach would use the Step Functions retry/catch state attributes to account for any time limits encountered and adjust the list size accordingly through some list site adjusting.

Step 6: Test for completion

Because the presence of a continuation token in the S3 ListObjects output signals that you are not done processing all objects yet, use a Choice state to test for its presence. If a continuation token exists, it branches into the UpdateSourceKeyList step, which uses the token to get to the next chunk of objects. If there is no token, you’re done. The state machine then branches into the FinishCopyBranch/FinishDeleteBranch state.

By using Choice states like this, you can create loops exactly like the old times, when you didn’t have for statements and used branches in assembly code instead!

Step 7: Success!

Finally, you’re done, and can step into your final Success state.

Lessons learned

When implementing this use case with Step Functions and Lambda, I learned the following things:

  • Sometimes, it is necessary to manipulate the JSON state of a Step Functions state machine with just a few lines of code that hardly seem to warrant their own Lambda function. This is ok, and the cost is actually pretty low given Lambda’s 100 millisecond billing granularity. The upside is that functions like these can be helpful to make the data more palatable for the following steps or for facilitating Choice states. An example here would be the combine_dicts.py function.
  • Pass states can be useful beyond debugging and tracing, they can be used to inject arbitrary values into your state JSON and guide generic Lambda functions into doing specific things.
  • Choice states are your friend because you can build while-loops with them. This allows you to reliably grind through large amounts of data with the patience of an engine that currently supports execution times of up to 1 year.

    Currently, there is an execution history limit of 25,000 events. Each Lambda task state execution takes up 5 events, while each choice state takes 2 events for a total of 7 events per loop. This means you can loop about 3500 times with this state machine. For even more scalability, you can split up work across multiple Step Functions executions through object key sharding or similar approaches.

  • It’s not necessary to spend a lot of time coding exception handling within your Lambda functions. You can delegate all exception handling to Step Functions and instead simplify your functions as much as possible.

  • Step Functions are great replacements for shell scripts. This could have been a shell script, but then I would have had to worry about where to execute it reliably, how to scale it if it went beyond a few thousand objects, etc. Think of Step Functions and Lambda as tools for scripting at a cloud level, beyond the boundaries of servers or containers. "Serverless" here also means "boundary-less".

Summary

This approach gives you scalability by breaking down any number of S3 objects into chunks, then using Step Functions to control logic to work through these objects in a scalable, serverless, and fully managed way.

To take a look at the code or tweak it for your own needs, use the code in the sync-buckets-state-machine GitHub repo.

To see more examples, please visit the Step Functions Getting Started page.

Enjoy!

Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3

Post Syndicated from Illya Yalovyy original https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/

Have you ever needed to move a large amount of data between Amazon S3 and Hadoop Distributed File System (HDFS) but found that the data set was too large for a simple copy operation? EMR can help you with this. In addition to processing and analyzing petabytes of data, EMR can move large amounts of data.

In the Hadoop ecosystem, DistCp is often used to move data. DistCp provides a distributed copy capability built on top of a MapReduce framework. S3DistCp is an extension to DistCp that is optimized to work with S3 and that adds several useful features. In addition to moving data between HDFS and S3, S3DistCp is also a Swiss Army knife of file manipulations. In this post we’ll cover the following tips for using S3DistCp, starting with basic use cases and then moving to more advanced scenarios:

1. Copy or move files without transformation
2. Copy and change file compression on the fly
3. Copy files incrementally
4. Copy multiple folders in one job
5. Aggregate files based on a pattern
6. Upload files larger than 1 TB in size
7. Submit a S3DistCp step to an EMR cluster

1. Copy or move files without transformation

We’ve observed that customers often use S3DistCp to copy data from one storage location to another, whether S3 or HDFS. Syntax for this operation is simple and straightforward:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table

The source location may contain extra files that we don’t necessarily want to copy. Here, we can use filters based on regular expressions to do things such as copying files with the .log extension only.

Each subfolder has the following files:

$ hadoop fs -ls /data/incoming/hourly_table/2017-02-01/03
Found 8 items
-rw-r--r--   1 hadoop hadoop     197850 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.25845.log
-rw-r--r--   1 hadoop hadoop     484006 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.32953.log
-rw-r--r--   1 hadoop hadoop     868522 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.62649.log
-rw-r--r--   1 hadoop hadoop     408072 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.64637.log
-rw-r--r--   1 hadoop hadoop    1031949 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.70767.log
-rw-r--r--   1 hadoop hadoop     368240 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.89910.log
-rw-r--r--   1 hadoop hadoop     437348 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/2017-02-01.03.96053.log
-rw-r--r--   1 hadoop hadoop        800 2017-02-19 03:41 /data/incoming/hourly_table/2017-02-01/03/processing.meta

To copy only the required files, let’s use the --srcPattern option:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table_filtered --srcPattern .*\.log

After the upload has finished successfully, let’s check the folder contents in the destination location to confirm only the files ending in .log were copied:

$ hadoop fs -ls s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03
-rw-rw-rw-   1     197850 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.25845.log
-rw-rw-rw-   1     484006 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.32953.log
-rw-rw-rw-   1     868522 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.62649.log
-rw-rw-rw-   1     408072 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.64637.log
-rw-rw-rw-   1    1031949 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.70767.log
-rw-rw-rw-   1     368240 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.89910.log
-rw-rw-rw-   1     437348 2017-02-19 22:56 s3://my-tables/incoming/hourly_table_filtered/2017-02-01/03/2017-02-01.03.96053.log

Sometimes, data needs to be moved instead of copied. In this case, we can use the --deleteOnSuccess option. This option is similar to aws s3 mv, which you might have used previously with the AWS CLI. The files are first copied and then deleted from the source:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest s3://my-tables/incoming/hourly_table_archive --deleteOnSuccess

After the preceding operation, the source location has only empty folders, and the target location contains all files.

$ hadoop fs -ls -R s3://my-tables/incoming/hourly_table/2017-02-01/
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/00
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/01
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/21
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/incoming/hourly_table/2017-02-01/22


$ hadoop fs -ls s3://my-tables/incoming/hourly_table_archive/2017-02-01/01
-rw-rw-rw-   1     676756 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.27047.log
-rw-rw-rw-   1     780197 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.59789.log
-rw-rw-rw-   1    1041789 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/2017-02-01.01.82293.log
-rw-rw-rw-   1        400 2017-02-19 23:27 s3://my-tables/incoming/hourly_table_archive/2017-02-01/01/processing.meta

The important things to remember here are that S3DistCp deletes only files with the --deleteOnSuccess flag and that it doesn’t delete parent folders, even when they are empty.

2. Copy and change file compression on the fly

Raw files often land in S3 or HDFS in an uncompressed text format. This format is suboptimal both for the cost of storage and for running analytics on that data. S3DistCp can help you efficiently store data and compress files on the fly with the --outputCodec option:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table_filtered --dest s3://my-tables/incoming/hourly_table_gz --outputCodec=gz

The current version of S3DistCp supports the codecs gzip, gz, lzo, lzop, and snappy, and the keywords none and keep (the default). These keywords have the following meaning:

  • none” – Save files uncompressed. If the files are compressed, then S3DistCp decompresses them.
  • keep” – Don’t change the compression of the files but copy them as-is.

Let’s check the files in the target folder, which have now been compressed with the gz codec:

$ hadoop fs -ls s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/
Found 3 items
-rw-rw-rw-   1     78756 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.27047.log.gz
-rw-rw-rw-   1     80197 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.59789.log.gz
-rw-rw-rw-   1    121178 2017-02-20 00:07 s3://my-tables/incoming/hourly_table_gz/2017-02-01/01/2017-02-01.01.82293.log.gz

3. Copy files incrementally

In real life, the upstream process drops files in some cadence. For instance, new files might get created every hour, or every minute. The downstream process can be configured to pick it up at a different schedule.

Let’s say data lands on S3 and we want to process it on HDFS daily. Copying all files every time doesn’t scale very well. Fortunately, S3DistCp has a built-in solution for that.

For this solution, we use a manifest file. That file allows S3DistCp to keep track of copied files. Following is an example of the command:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest s3://my-tables/processing/hourly_table --srcPattern .*\.log --outputManifest=manifest-2017-02-25.gz --previousManifest=s3://my-tables/processing/hourly_table/manifest-2017-02-24.gz

The command takes two manifest files as parameters, outputManifest and previousManifest. The first one contains a list of all copied files (old and new), and the second contains a list of files copied previously. This way, we can recreate the full history of operations and see what files were copied during each run:

$ hadoop fs -text s3://my-tables/processing/hourly_table/manifest-2017-02-24.gz > previous.lst
$ hadoop fs -text s3://my-tables/processing/hourly_table/manifest-2017-02-25.gz > current.lst
$ diff previous.lst current.lst
2548a2549,2550
> {"path":"s3://my-tables/processing/hourly_table/2017-02-25/00/2017-02-15.00.50958.log","baseName":"2017-02-25/00/2017-02-15.00.50958.log","srcDir":"s3://my-tables/processing/hourly_table","size":610308}
> {"path":"s3://my-tables/processing/hourly_table/2017-02-25/00/2017-02-25.00.93423.log","baseName":"2017-02-25/00/2017-02-25.00.93423.log","srcDir":"s3://my-tables/processing/hourly_table","size":178928}

S3DistCp creates the file in the local file system using the provided path, /tmp/mymanifest.gz. When the copy operation finishes, it moves that manifest to <DESTINATION LOCATION>.

4. Copy multiple folders in one job

Imagine that we need to copy several folders. Usually, we run as many copy jobs as there are folders that need to be copied. With S3DistCp, the copy can be done in one go. All we need is to prepare a file with list of prefixes and use it as a parameter for the tool:

$ s3-dist-cp --src s3://my-tables/incoming/hourly_table_filtered --dest s3://my-tables/processing/sample_table --srcPrefixesFile file://${PWD}/folders.lst

In this case, the folders.lst file contains the following prefixes:

$ cat folders.lst
s3://my-tables/incoming/hourly_table_filtered/2017-02-10/11
s3://my-tables/incoming/hourly_table_filtered/2017-02-19/02
s3://my-tables/incoming/hourly_table_filtered/2017-02-23

As a result, the target location has only the requested subfolders:

$ hadoop fs -ls -R s3://my-tables/processing/sample_table
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-10
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-10/11
-rw-rw-rw-   1     139200 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-10/11/2017-02-10.11.12980.log
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-19
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-19/02
-rw-rw-rw-   1     702058 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-19/02/2017-02-19.02.19497.log
-rw-rw-rw-   1     265404 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-19/02/2017-02-19.02.26671.log
...
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-23
drwxrwxrwx   -          0 1970-01-01 00:00 s3://my-tables/processing/sample_table/2017-02-23/00
-rw-rw-rw-   1     310425 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-23/00/2017-02-23.00.10061.log
-rw-rw-rw-   1    1030397 2017-02-24 05:59 s3://my-tables/processing/sample_table/2017-02-23/00/2017-02-23.00.22664.log
...

5. Aggregate files based on a pattern

Hadoop is optimized for reading a fewer number of large files rather than many small files, whether from S3 or HDFS. You can use S3DistCp to aggregate small files into fewer large files of a size that you choose, which can optimize your analysis for both performance and cost.

In the following example, we combine small files into bigger files. We do so by using a regular expression with the –groupBy option.

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table --targetSize=10 --groupBy=’.*/hourly_table/.*/(\d\d)/.*\.log’

Let’s take a look into the target folders and compare them to the corresponding source folders:

$ hadoop fs -ls /data/incoming/hourly_table/2017-02-22/05/
Found 8 items
-rw-r--r--   1 hadoop hadoop     289949 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.11125.log
-rw-r--r--   1 hadoop hadoop     407290 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.19596.log
-rw-r--r--   1 hadoop hadoop     253434 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.30135.log
-rw-r--r--   1 hadoop hadoop     590655 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.36531.log
-rw-r--r--   1 hadoop hadoop     762076 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.47822.log
-rw-r--r--   1 hadoop hadoop     489783 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.80518.log
-rw-r--r--   1 hadoop hadoop     205976 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/2017-02-22.05.99127.log
-rw-r--r--   1 hadoop hadoop        800 2017-02-19 06:07 /data/incoming/hourly_table/2017-02-22/05/processing.meta

 

$ hadoop fs -ls s3://my-tables/processing/daily_table/2017-02-22/05/
Found 2 items
-rw-rw-rw-   1   10541944 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/054
-rw-rw-rw-   1   10511516 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/055

As you can see, seven data files were combined into two with a size close to the requested 10 MB. The *.meta file was filtered out because --groupBy pattern works in a similar way to –srcPattern. We recommend keeping files larger than the default block size, which is 128 MB on EMR.

The name of the final file is composed of groups in the regular expression used in --groupBy plus some number to make the name unique. The pattern must have at least one group defined.

Let’s consider one more example. This time, we want the file name to be formed from three parts: year, month, and file extension (.log in this case). Here is an updated command:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table_2017 --targetSize=10 --groupBy=’.*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)’

Now we have final files named in a different way:

$ hadoop fs -ls s3://my-tables/processing/daily_table_2017/2017-02-22/05/
Found 2 items
-rw-rw-rw-   1   10541944 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/2017-05log4
-rw-rw-rw-   1   10511516 2017-02-28 05:16 s3://my-tables/processing/daily_table/2017-02-22/05/2017-05log5

As you can see, names of final files consist of concatenation of 3 groups from the regular expression (2017-), (\d\d), (log).

You might find that occasionally you get an error that looks like the following:

$ s3-dist-cp --src /data/incoming/hourly_table --dest s3://my-tables/processing/daily_table_2017 --targetSize=10 --groupBy=’.*/hourly_table/.*(2018-).*/(\d\d)/.*\.(log)’
...
17/04/27 15:37:45 INFO S3DistCp.S3DistCp: Created 0 files to copy 0 files
... 
Exception in thread “main” java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.S3DistCp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.S3DistCp.S3DistCp.run(S3DistCp.java:705)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.S3DistCp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
…

In this case, the key information is contained in Created 0 files to copy 0 files. S3DistCp didn’t find any files to copy because the regular expression in the --groupBy option doesn’t match any files in the source location.

The reason for this issue varies. For example, it can be a mistake in the specified pattern. In the preceding example, we don’t have any files for the year 2018. Another common reason is incorrect escaping of the pattern when we submit S3DistCp command as a step, which is addressed later later in this post.

6. Upload files larger than 1 TB in size

The default upload chunk size when doing an S3 multipart upload is 128 MB. When files are larger than 1 TB, the total number of parts can reach over 10,000. Such a large number of parts can make the job run for a very long time or even fail.

In this case, you can improve job performance by increasing the size of each part. In S3DistCp, you can do this by using the --multipartUploadChunkSize option.

Let’s test how it works on several files about 200 GB in size. With the default part size, it takes about 84 minutes to copy them to S3 from HDFS.

We can increase the default part size to 1000 MB:

$ time s3-dist-cp --src /data/gb200 --dest s3://my-tables/data/S3DistCp/gb200_2 --multipartUploadChunkSize=1000
...
real    41m1.616s

The maximum part size is 5 GB. Keep in mind that larger parts have a higher chance to fail during upload and don’t necessarily speed up the process. Let’s run the same job with the maximum part size:

time s3-dist-cp --src /data/gb200 --dest s3://my-tables/data/S3DistCp/gb200_2 --multipartUploadChunkSize=5000
...
real    40m17.331s

7. Submit a S3DistCp step to an EMR cluster

You can run the S3DistCp tool in several ways. First, you can SSH to the master node and execute the command in a terminal window as we did in the preceding examples. This approach might be convenient for many use cases, but sometimes you might want to create a cluster that has some data on HDFS. You can do this by submitting a step directly in the AWS Management Console when creating a cluster.

In the console add step dialog box, we can fill the fields in the following way:

  • Step type: Custom JAR
  • Name*: S3DistCp Stepli>
  • JAR location: command-runner.jar
  • Arguments: s3-dist-cp --src s3://my-tables/incoming/hourly_table --dest /data/input/hourly_table --targetSize 10 --groupBy .*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)
  • Action of failure: Continue

Notice that we didn’t add quotation marks around our pattern. We needed quotation marks when we were using bash in the terminal window, but not here. The console takes care of escaping and transferring our arguments to the command on the cluster.

Another common use case is to run S3DistCp recurrently or on some event. We can always submit a new step to the existing cluster. The syntax here is slightly different than in previous examples. We separate arguments by commas. In the case of a complex pattern, we shield the whole step option with single quotation marks:

aws emr add-steps --cluster-id j-ABC123456789Z --steps 'Name=LoadData,Jar=command-runner.jar,ActionOnFailure=CONTINUE,Type=CUSTOM_JAR,Args=s3-dist-cp,--src,s3://my-tables/incoming/hourly_table,--dest,/data/input/hourly_table,--targetSize,10,--groupBy,.*/hourly_table/.*(2017-).*/(\d\d)/.*\.(log)'

Summary

This post showed you the basics of how S3DistCp works and highlighted some of its most useful features. It covered how you can use S3DistCp to optimize for raw files of different sizes and also selectively copy different files between locations. We also looked at several options for using the tool from SSH, the AWS Management Console, and the AWS CLI.

If you have questions or suggestions, leave a message in the comments.


Next Steps

Take your new knowledge to the next level! Click on the post below and learn the top 10 tips to improve query performance in Amazon Athena.

Top 10 Performance Tuning Tips for Amazon Athena


About the Author

Illya Yalovyy is a Senior Software Development Engineer with Amazon Web Services. He works on cutting-edge features of EMR and is heavily involved in open source projects such as Apache Hive, Apache Zookeeper, Apache Sqoop. His spare time is completely dedicated to his children and family.