Tag Archives: Analytics

How Amazon Transportation Service enabled near-real-time event analytics at petabyte scale using AWS Glue with Apache Hudi

Post Syndicated from Madhavan Sriram original https://aws.amazon.com/blogs/big-data/how-amazon-transportation-service-enabled-near-real-time-event-analytics-at-petabyte-scale-using-aws-glue-with-apache-hudi/

This post is co-written with Madhavan Sriram and Diego Menin from Amazon Transportation Services (ATS).

The transportation and logistics industry covers a wide range of services, such as multi-modal transportation, warehousing, fulfillment, freight forwarding, and delivery. At Amazon Transportation Service (ATS), the lifecycle of the shipment is digitally tracked and appended to tens of tracking updates on average. Those tracking updates are vital to kick off events through the shipment operational and billing lifecycle, including delay identification and route optimization. They are also the base for the customer and consumer tracking experience through the different touchpoints.

In this post, we discuss how ATS enabled near-real-time event analytics at petabyte scale using Apache Hudi tables created by AWS Glue Spark jobs.

ATS was looking for ways to securely and cost-efficiently manage and derive analytical insights over petabyte-sized datasets, with data coming in from different sources at different paces, and stored over different storage solutions. You can gain deeper and richer insights when you bring together all your relevant data of all structures and types, from all sources, to analyze.

One of the main challenges that our data engineering team at ATS faced was bringing together all the data arriving in real time, and building a holistic view for our customers and partners. The majority of the orders placed through Amazon, one of the world’s largest online retailers, are operationalized by ATS for the transportation and logistics. ATS provides the business accurate and timely package delivery. ATS operations generate data at petabyte scale, so having the data available at their fingertips provides innumerable opportunities to improve operations through data-driven decision-making.

Apache Hudi is an open-source data management framework used to simplify incremental data processing and data pipeline development. This framework more efficiently manages business requirements like data lifecycles and improves data quality. Hudi enables you to manage data at the record level in Amazon Simple Storage Service (Amazon S3) data lakes to simplify change data capture (CDC) and streaming data ingestion at petabyte scale, and helps handle data privacy use cases requiring record-level updates and deletes.

Solution overview

One of the biggest challenges ATS faced was handling data at petabyte scale with the need for constant inserts, updates, and deletes with minimal time delay, which reflects real business scenarios and package movement to downstream data consumers.

Their traditional data warehouses couldn’t scale to the size of the data nor the frequency of data ingestion. They needed to scale to hundreds of GBs of data across multiple data ingestion sources in order to derive near-real-time data for downstream consumers to use for data analytics that powered business-critical reports, dashboards, and visualizations. The data is also used for training machine learning models with overall service level agreements (SLAs) of 15 minutes for data ingestion and processing.

In this post, we show how we ingest data in real time in the order of hundreds of GBs per hour and run inserts, updates, and deletes on a petabyte-scale data lake using Apache Hudi tables loaded using AWS Glue Spark jobs and other AWS server-less services including AWS Lambda, Amazon Kinesis Data Firehose, and Amazon DynamoDB. AWS ProServe, working closely with ATS, built a data lake comprising of Apache Hudi tables on Amazon S3 created and populated using AWS Glue. A data pipeline was created that supports inserts, updates, and deletes at petabyte scale on the Apache Hudi tables using AWS Glue. To support real-time time ingestion, ATS also implemented a real-time data ingestion pipeline based on Kinesis Data Firehose, DynamoDB, and Amazon DynamoDB Streams.

To tackle the challenges we discussed, we decided to follow the “Divide et Impera” approach, and define two separate workstreams:

  • Stream-based – We ingested data from four different data sources and 11 datasets, and performed some initial data transformation and joins steps, honoring a time window that may vary from 3 hours to 2 weeks across all workloads. The event rate might go up to thousands of events per second, and events might have duplicates, arrive late, or not be in the correct order. Our objective was to understand in real time the transit status of a given package or truck, capture the current status of ATS operations in real time, and extend the current stream-based solution to offload and supplement the current extract, transform, and load (ETL) solution, based on Amazon Redshift.
  • Data lake – We wanted the ability to store petabytes of data and allow for merges between historical data (petabytes) with newly ingested data. The data retention policy extends to up to 5 years, which brings increased costs and reduces performance significantly. Our team requires access to near-real-time data (less than 15 minutes) from stream-based ingestion, with full GDPR compliance. Our objective was to merge stream-based ingested data files to derive a holistic view of the dataset at a certain point in time, with an SLA of under 15 minutes. Data lineage capabilities would also be nice to have.

Stream-based solution

The following diagram illustrates the architecture of our stream-based solution.

The flow of the solution is as follows:

  1. Data is ingested from various sources in separate Firehose data streams, collected for up to 15 minutes and stored in S3 buckets.
  2. Upon the arrival of every new file in Amazon S3, a Lambda function is triggered to insert data into a DynamoDB table associated with a specific data source or datasets.
  3. With DynamoDB Streams, we trigger a second Lambda function that aggregates data in real time across the different DynamoDB tables by performing real-time DynamoDB table lookups. The ETL window is enforced using DynamoDB item TTL, so data is automatically deleted from the table after the TTL period expires.
  4. After it’s transformed, data is collected in Amazon S3 passing through a Firehose delivery stream and is ready to be ingested into our data lake.

The solution allows us to do the following:

  • Ingest data in parallel, in real time, and at the desired scale from all the data sources
  • Scale on demand, and with minimal human operational overhead; this is achieved using an AWS Serverless technology stack
  • Implement our desired time window on a per-item base, reducing costs and the total amount of data stored
  • Implement ETL using Lambda functions in Python, thereby providing a tighter grasp over expressing the business logic
  • Access data on Amazon S3 before it’s ingested into our data lake, and allow customers and partners to consume data in raw format if needed

The data present in Amazon S3 represents the starting point for a seamless data lake integration.

Data lake ingestion

Moving into our data lake, the following diagram illustrates our architecture for data lake ingestion.

The core implementation in this architecture is the AWS Glue Spark ingestion job for the Hudi table; it represents the entry point for the incremental data processing pipeline.

AWS Glue Spark job runs with a concurrency of 1 and contains the logic for upsert and delete sequentially applied on the Hudi table. The sequencing of delete after upsert in the AWS Glue Spark job ensures, deletes are applied after upsert and the data consistency is maintained even in case of job reruns.

To use Apache Hudi v0.7 on AWS Glue jobs using PySpark, we imported the following libraries in the AWS Glue jobs, extracted locally from the master node of Amazon EMR:

  • hudi-spark-bundle_2.11-0.7.0-amzn-1.jar
  • spark-avro_2.11-2.4.7-amzn-1.jar

We recommend using Glue 3.0 with Hudi 0.9.0 connector rather than importing Hudi v0.7 jar files from EMR, for seamless integration and have more capabilities and features.

Before we insert data the Hudi table, we prepare it for push. To optimize for incremental merge, we take a fixed lookup window based on business use case considerations. We start by reading historical data in a given time window. See the following code:

# HUDI DATA READ
read_options = {
  'hoodie.datasource.query.type': 'snapshot'
}


# HUDI DATAFRAME  created  from target Hudi Table on S3
hudi_df = spark. \
  read. \
  format("hudi"). \
  options(*read_options). \
  load(config['target'] + "////*")

# Read Historical data set, load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery

# input_df is the INCREMENT DATAFRAME created from incrementally ingested data on S3
input_df = spark.read.format("csv"). options(header='true').load(config['incremental_path'])



window_year, window_month, window_day = year_month_day_window()
window_filter_clause = "year >= {} AND month >= {} and day >= {}".format(window_year, window_month, window_day)

# We merge it with the incoming newly available data: 

# Data from Hudi Table on S3, because our use case is global, id is unique else id + partitionPath = unique.
hudi_s3_df = hudi_df.select(col("node_id"),col("container_label"),col(config['sort_key'])).filter(window_filter_clause)

# Perform a left outer join between new data (input_df) and data present in S3 Hudi. (hudi_s3_df)
hudi_join_df = input_df.alias("incomingData").join(hudi_s3_df.alias("S3HudiData"), (input_df.node_id == hudi_s3_df.node_id) & (input_df.container_label == hudi_s3_df.container_label), "leftouter")

# As it's a Left Outer join, there might bew new records which aren't present on S3 Hudi. 

hudi_new_df = hudi_join_df.filter(col("S3HudiData.last_update_row_epoch_seconds").isNull()).drop(col("S3HudiData.node_id")).drop(col("S3HudiData.container_label")).drop(col("S3HudiData.last_update_row_epoch_seconds"))

# As it's a Left Outer join, Select the records where input_df.last_update_time > hudi_s3_df.last_update_time. 

hudi_updated_df = hudi_join_df.filter(col("S3HudiData.last_update_row_epoch_seconds").isNotNull() & (col("incomingData.last_update_row_epoch_seconds") > col("S3HudiData.last_update_row_epoch_seconds"))).drop(col("S3HudiData.node_id")).drop(col("S3HudiData.container_label")).drop(col("S3HudiData.last_update_row_epoch_seconds"))
hudi_final_df = hudi_new_df.union(hudi_updated_df)

#  After we prepare the data to be pushed in the Hudi table, we implement the Hudi table update using the following code:

(hudi_final_df.write.format(HUDI_FORMAT)
.option(TABLE_NAME, config['hudi_table_name'])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(PARTITIONPATH_FIELD_OPT_KEY,config["partition_keys"])
.option(KEYGENERATOR_CLASS_OPT_KEY, COMPLEX_KEYGENERATOR_CLASS_OPT_VAL)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 1500)
.option('hoodie.payload.ordering.field',config["sort_key"])
.option(PAYLOAD_CLASS_OPT_KEY,'org.apache.hudi.common.model.DefaultHoodieRecordPayload')
.option(HIVE_PARTITION_FIELDS_OPT_KEY, config["partition_keys"])
.option(HIVE_DATABASE_OPT_KEY,config['hudi_database'])
.option(HIVE_TABLE_OPT_KEY,config['hudi_table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_JDBC_SYNC,"false")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL)
.option('hoodie.datasource.write.hive_style_partitioning', 'true')
# To switch to Global Bloom index, set the following configuration as GLOBAL_BLOOM.
.option('hoodie.index.type', 'GLOBAL_SIMPLE') 
.option('hoodie.simple.index.update.partition.path', 'true')
.option('hoodie.global.simple.index.parallelism', '500')
.mode("append")
.save(config['target']))

In the preceding code, config is a dictionary that includes all the Apache Hudi configurations. The AWS Glue Data Catalog is automatically synched after Hudi table creation, as part of the Glue job, reflecting the Amazon S3 partition structure. We can now query the data using Amazon Athena or Amazon Redshift Spectrum.

To comply with our strict internal ingestion SLA, we had to dedicate special attention to employing the right Hudi indexes and defining the right table type. For the latter, we analyzed the type of workload. Due to the analytic nature of the datasets and use case, we identified that the right configuration would be to use a COPY_ON_WRITE table, even if that was a compromise on the write performances but enhanced read performance.

For the former, we went through an experimentation phase. We started with a GLOBAL_BLOOM index, identifying an initial non-linear pattern for data writing performances.

Given the randomness and the time window specified for the input data, we have encountered a significant number of false positives, leading to reading the entire dataset back for comparison. Moreover, GLOBAL_BLOOM keeps increasing linearly corresponding to the data size, whereas GLOBAL_SIMPLE doesn’t bring this overhead (with a fixed lookup window) as can be observed in the diagram.

The graph represents the Total Time taken by Glue Hudi job(X-axis) over days (Y axis) as incoming data is merged with the historical data.  leveraging GLOBAL_BLOOM.  The graph in upper half, shows when same data was merged consecutively over days, non-linear time increase was observed. Lower half graph indicates Linear increase with a steep slope when new incoming data was merged with historical data.

GLOBAL_BLOOM wasn’t appropriate for our use case as the historical data spanned back to 5 years, and Glue job will not be able to meet the SLA demands. At this point, we investigated GLOBAL_SIMPLE indexes, reaching the expected performance patterns.

Our data lake solution allows us to do the following:

  • Ingest data files in a petabyte-scale data lake, with a 15-minute ingestion SLA from the moment we receive the data
  • Read the data at Peta Byte scale by leveraging Amazon S3 partitions (created by Glue jobs and mapped to Hudi partition logic) and faster lookups by using Hudi indexes
  • Use Hudi data lineage capabilities
  • Reduce costs for data storage, infrastructure maintenance, and development
  • Manage data governance using AWS Lake Formation, which allows partners and customers to query the data using their own tools, while allowing ATS to retain control over our data

Conclusion

In this post, we highlighted how ATS built a real-time fully serverless data ingestion platform, scaling up to thousands of events per second and merging with petabyte- sized historical data stored in a data lake in near-real time.

We built a petabyte-scale data lake solution based on Apache Hudi and AWS Glue that allows us to share our data within 15 minutes from ingestion with our partners and consumers, while retaining complete control over our data and automatically offloading costs for data consumption. This provides linear performance as data grows over time.

About Amazon Transportation Service

Amazon Transportation Service (ATS) is the middle mile of the transportation network of Amazon, connecting the fulfillment centers at one end and the delivery stations and post offices at the other end. We enable packages that are ordered and packaged from fulfillment centers that traverse across the European continent to be delivered in the final delivery station that does the house-to-house delivery.


About the Authors

Madhavan Sriram is a Manager, Data Science who comes with a wide experience across multiple enterprise organisations in the space of Big Data and Machine Learning technologies. He currently leads the Data Technology and Products team within Amazon Transportation Services (ATS) and builds data-intensive products for the transportation network within Amazon. In his free time, Madhavan enjoys photography and poetry.

Diego Menin is a Senior Data Engineer within the Data Technology and Products team. He comes with a wide experience across startups and enterprises with deep AWS expertise to develop scalable cloud-based data and analytics products. Within Amazon, he is the architect of Amazon’s transportation data lake and working heavily on streaming data and integration mechanisms with downstream applications through the data lake.

Gabriele Cacciola is a Senior Data Architect working for the Professional Service team with Amazon Web Services. Coming from a solid Startup experience, he currently helps enterprise customers across EMEA implement their ideas, innovate using the latest tech and build scalable data and analytics solutions to make critical business decisions. In his free time, Gabriele enjoys football and cooking.

Kunal Gautam is a Senior Big Data Architect at Amazon Web Services. Having experience in building his own Startup and working along with enterprises, he brings a unique perspective to get people, business and technology work in tandem for customers. He is passionate about helping customers in their digital transformation journey and enables them to build scalable data and advance analytics solutions to gain timely insights and make critical business decisions. In his spare time, Kunal enjoys Marathons, Tech Meetups and Meditation retreats.

Optimize performance and reduce costs for network analytics with VPC Flow Logs in Apache Parquet format

Post Syndicated from Radhika Ravirala original https://aws.amazon.com/blogs/big-data/optimize-performance-and-reduce-costs-for-network-analytics-with-vpc-flow-logs-in-apache-parquet-format/

VPC Flow Logs help you understand network traffic patterns, identify security issues, audit usage, and diagnose network connectivity on AWS. Customers often route their VPC flow logs directly to Amazon Simple Storage Service (Amazon S3) for long-term retention. You can then use a custom format conversion application to convert these text files into an Apache Parquet format to optimize the analytical processing of the log data and reduce the cost of log storage. This custom format conversion step added complexity, time to insight, and costs to the VPC flow log traffic analytics. Until today, VPC flow logs were delivered to Amazon S3 as raw text files in GZIP format.

Today, we’re excited to announce a new feature that delivers VPC flow logs in the Apache Parquet format, making it easier, faster, and more cost-efficient to analyze your VPC flow logs stored in Amazon S3. You can also deliver VPC flow logs to Amazon S3 with Hive-compatible S3 prefixes partitioned by the hour.

Apache Parquet is an open-source file format that stores data efficiently in columnar format, provides different encoding types, and supports predicate filtering. With good compression ratios and efficient encoding, VPC flow logs stored in Parquet reduce your Amazon S3 storage costs. When querying flow logs persisted in Parquet format with analytic frameworks, non-relevant data is skipped, requiring fewer reads on Amazon S3 and thereby improving query performance. To reduce query running time and cost with Amazon Athena and Amazon Redshift Spectrum, Apache Parquet is often the recommended file format.

In this post, we explore this new feature and how it can help you run performant queries on your flow logs with Athena.

Create flow logs with Parquet file format

To take advantage of this feature, simply create a new VPC flow log subscription with Amazon S3 as the destination using the AWS Management Console, AWS Command Line Interface (AWS CLI), or API. On the console, when creating new a VPC flow log subscription with Amazon S3, you can select one or more of the following options:

  • Log file format
  • Hive-compatible S3 prefixes
  • Partition logs by time

We now explore how each of these options can make processing and storage of flow logs more efficient

Apache Parquet formatted files

By default, your logs are delivered in text format. To change to Parquet, for Log file format, select Parquet. This delivers your VPC flow logs to Amazon S3 in the Apache Parquet format.

Note the following considerations:

  • You can’t change existing flow logs to deliver logs in Parquet format. You need to create a new VPC flow log subscription with Parquet as the log file format.
  • Consider using a higher maximum aggregation interval (10 minutes) when aggregating flow packets to ensure larger Parquet files on Amazon S3.
  • Refer to Amazon CloudWatch pricing for pricing of log delivery in Apache Parquet format for VPC flow logs

Hive-compatible partitions

Partitioning is a technique to organize your data to improve the efficiency of your query engine. Partitions aligned with the columns that are frequently used in the query filters can significantly lower your query response time. You can now specify that your flow logs be organized in Hive-compatible format. This allows you to run the MSCK REPAIR command in Athena to quickly and easily add new partitions as they get delivered into Amazon S3. Simply select Enable for Hive-compatible S3 prefix to set this up. This delivers the flow logs to Amazon S3 in the following path:

s3://my-flow-log-bucket/my-custom-flow-logs/AWSLogs/aws-account-id=123456789012/aws-service=vpcflowlogs/aws-region=us-east-1/year=2021/month=10/day=07/123456789012_vpcflowlogs_us-east-1_fl-06a0eeb1087d806aa_20211007T1930Z_d5ab7c14.log.parquet

Per-hour partitions

You can also organize your flow logs at a much more granular level by adding per-hour partitions. You should enable this feature if you constantly need to query large volumes of logs with a specific time frame as the predicate. Querying logs only during certain hours results in less data scanned, which translates to lower cost per query with engines such as Athena and Redshift Spectrum.

You can also set per-hour partitions via an API or the AWS CLI using the --destination-options parameter in create-flow-logs:

aws ec2 create-flow-logs \
--resource-type VPC \
--resource-ids vpc-001122333 \
--traffic-type ALL \
--log-destination-type s3 \
--log-destination arn:aws:s3:::my-flow-log-bucket/my-custom-flow-logs/ \
--destination-options FileFormat=parquet,HiveCompatiblePartitions=True, PerHourPartition=True

The following is a sample flow log file deposited into an hourly bucket. By default, the flow logs in Parquet are compressed using Gzip format, which has the highest compression ratio compared to other compression formats.

s3://my-flow-log-bucket/my-custom-flow-logs/AWSLogs/aws-account-id=123456789012/aws-service=vpcflowlogs/aws-region=us-east-1/year=2021/month=10/day=07/hour=19/123456789012_vpcflowlogs_us-east-1_fl-06a0eeb1087d806aa_20211007T1930Z_d5ab7c14.log.parquet

Query with Athena

You can use the Athena integration for VPC Flow Logs from the Amazon VPC console to automate the Athena setup and query VPC flow logs in Amazon S3. This integration has now been extended to support these new flow log delivery options to Amazon S3.

To demonstrate querying flow logs in Parquet and in plain text in this blog, let’s start from the Amazon Athena console.  We begin by creating an external table pointing to flow logs in Parquet.

Note that this feature supports specifying flow logs fields in Parquet’s native data types. This eliminates the need for you to cast your fields when querying the traffic logs.

Then run MSCK REPAIR TABLE.

Let’s run a sample query on these Parquet-based flow logs.

Now, let’s create a table for flow logs delivered in plain text.

We add the partitions using the ALTER TABLE statement in Athena.

Run a simple flow logs query and note the time it took to run the query.

The Athena query run time with flow logs in Parquet (1.16 seconds) is much faster than the run time with flow logs in plain text (2.51 seconds).

For benchmarks that further describe the cost savings and performance improvements from persisting data in Parquet in granular partitions, see Top 10 Performance Tuning Tips for Amazon Athena.

Summary

You can now deliver your VPC flow logs to Amazon S3 with three new options:

  • In Apache Parquet formatted files
  • With Hive-compatible S3 prefixes
  • In hourly partitioned files

These delivery options make it faster, easier, and more cost-efficient to store and run analytics on your VPC flow logs. To learn more, visit VPC Flow Logs documentation. We hope you will give this feature a try and share your experience with us. Please send feedback to the AWS forum for Amazon VPC or through your usual AWS support contacts.


About the Authors

Radhika Ravirala is a Principal Streaming Architect at Amazon Web Services, where she helps customers craft distributed streaming applications using Amazon Kinesis and Amazon MSK. In her free time, she enjoys long walks with her dog, playing board games, and reading widely.

Vaibhav Katkade is a Senior Product Manager in the Amazon VPC team. He is interested in areas of network security and cloud networking operations. Outside of work, he enjoys cooking and the outdoors.

Offloading SQL for Amazon RDS using the Heimdall Proxy

Post Syndicated from Antony Prasad Thevaraj original https://aws.amazon.com/blogs/architecture/offloading-sql-for-amazon-rds-using-the-heimdall-proxy/

Getting the maximum scale from your database often requires fine-tuning the application. This can increase time and incur cost – effort that could be used towards other strategic initiatives. The Heimdall Proxy was designed to intelligently manage SQL connections to help you get the most out of your database.

In this blog post, we demonstrate two SQL offload features offered by this proxy:

  1. Automated query caching
  2. Read/Write split for improved database scale

By leveraging the solution shown in Figure 1, you can save on development costs and accelerate the onboarding of applications into production.

Figure 1. Heimdall Proxy distributed, auto-scaling architecture

Figure 1. Heimdall Proxy distributed, auto-scaling architecture

Why query caching?

For ecommerce websites with high read calls and infrequent data changes, query caching can drastically improve your Amazon Relational Database Sevice (RDS) scale. You can use Amazon ElastiCache to serve results. Retrieving data from cache has a shorter access time, which reduces latency and improves I/O operations.

It can take developers considerable effort to create, maintain, and adjust TTLs for cache subsystems. The proxy technology covered in this article has features that allow for automated results caching in grid-caching chosen by the user, without code changes. What makes this solution unique is the distributed, scalable architecture. As your traffic grows, scaling is supported by simply adding proxies. Multiple proxies work together as a cohesive unit for caching and invalidation.

View video: Heimdall Data: Query Caching Without Code Changes

Why Read/Write splitting?

It can be fairly straightforward to configure a primary and read replica instance on the AWS Management Console. But it may be challenging for the developer to implement such a scale-out architecture.

Some of the issues they might encounter include:

  • Replication lag. A query read-after-write may result in data inconsistency due to replication lag. Many applications require strong consistency.
  • DNS dependencies. Due to the DNS cache, many connections can be routed to a single replica, creating uneven load distribution across replicas.
  • Network latency. When deploying Amazon RDS globally using the Amazon Aurora Global Database, it’s difficult to determine how the application intelligently chooses the optimal reader.

The Heimdall Proxy streamlines the ability to elastically scale out read-heavy database workloads. The Read/Write splitting supports:

  • ACID compliance. Determines the replication lag and know when it is safe to access a database table, ensuring data consistency.
  • Database load balancing. Tracks the status of each DB instance for its health and evenly distribute connections without relying on DNS.
  • Intelligent routing. Chooses the optimal reader to access based on the lowest latency to create local-like response times. Check out our Aurora Global Database blog.

View video: Heimdall Data: Scale-Out Amazon RDS with Strong Consistency

Customer use case: Tornado

Hayden Cacace, Director of Engineering at Tornado

Tornado is a modern web and mobile brokerage that empowers anyone who aspires to become a better investor.

Our engineering team was tasked to upgrade our backend such that it could handle a massive surge in traffic. With a 3-month timeline, we decided to use read replicas to reduce the load on the main database instance.

First, we migrated from Amazon RDS for PostgreSQL to Aurora for Postgres since it provided better data replication speed. But we still faced a problem – the amount of time it would take to update server code to use the read replicas would be significant. We wanted the team to stay focused on user-facing enhancements rather than server refactoring.

Enter the Heimdall Proxy: We evaluated a handful of options for a database proxy that could automatically do Read/Write splits for us with no code changes, and it became clear that Heimdall was our best option. It had the Read/Write splitting “out of the box” with zero application changes required. And it also came with database query caching built-in (integrated with Amazon ElastiCache), which promised to take additional load off the database.

Before the Tornado launch date, our load testing showed the new system handling several times more load than we were able to previously. We were using a primary Aurora Postgres instance and read replicas behind the Heimdall proxy. When the Tornado launch date arrived, the system performed well, with some background jobs averaging around a 50% hit rate on the Heimdall cache. This has really helped reduce the database load and improve the runtime of those jobs.

Using this solution, we now have a data architecture with additional room to scale. This allows us to continue to focus on enhancing the product for all our customers.

Download a free trial from the AWS Marketplace.

Resources

Heimdall Data, based in the San Francisco Bay Area, is an AWS Advanced Tier ISV partner. They have Amazon Service Ready designations for Amazon RDS and Amazon Redshift. Heimdall Data offers a database proxy that offloads SQL improving database scale. Deployment does not require code changes. For other proxy options, consider the Amazon RDS Proxy, PgBouncer, PgPool-II, or ProxySQL.

Simplify your data analysis with Amazon Redshift Query Editor v2

Post Syndicated from Srikanth Sopirala original https://aws.amazon.com/blogs/big-data/simplify-your-data-analysis-with-amazon-redshift-query-editor-v2/

Amazon Redshift is a fast, fully managed cloud data warehouse that provides a web-based query editor in addition to supporting connectivity via ODBC/JDBC or the Redshift Data API. Tens of thousands of customers use Amazon Redshift as their analytics platform. Data analysts, database developers, and data scientists use SQL to analyze their data in Amazon Redshift data warehouses. Amazon Redshift Query Editor v2 is a web-based SQL client application that you can use to author and run queries on your Amazon Redshift data warehouse. You can visualize query results with charts and collaborate by sharing queries with members of your team.

Query Editor v2 provides several capabilities, such as the ability to browse and explore multiple databases, external tables, views, stored procedures, and user-defined functions. It provides wizards to create schemas, tables, and user-defined functions. It simplifies the management and collaboration of saved queries. You can also gain faster insights by visualizing the results with a single click.

Query Editor v2 enhances and builds upon the functionality of the prior version of the query editor, such as increased size of queries, the ability to author and run multi-statement queries, support for session variables, and query parameters, to name a few.

You can provide Query Editor v2 to end-users such as data analysts, database developers, and data scientists without providing the privileges required to access the Amazon Redshift console.

In this post, we walk through how to create an AWS Identity and Access Management (IAM) role to provide access to Query Editor v2 for end-users, easily connect to your clusters, run SQL queries, load data in your clusters, create charts, and share queries directly from the console.

Configure Query Editor v2 for your AWS account

As an admin, you must first configure Query Editor v2 before providing access to your end-users.

You can access Query Editor v2 from the Amazon Redshift console.

When you choose Query Editor v2 from the Editor options, a new tab in your browser opens with the Query Editor v2 interface.

By default, an AWS-owned key is used to encrypt resources. Optionally, you can create a symmetric customer managed key to encrypt Query Editor v2 resources such as saved queries and query results using the AWS Key Management Service (AWS KMS) console or AWS KMS API operations.

Provide access to Query Editor v2 for your end-users

Enterprises want to democratize access to data in the data warehouse securely by providing a web-based query editor to their end-users. You can either use IAM users or integrate the AWS console with your single sign-on (SSO) provider to provide access to end-users. In a future post, we will document all necessary steps to integrate your SSO provider with the query editor.

To enable your users to access Query Editor v2 using IAM, as an administrator, you can attach one of the AWS-managed policies depicted in the following table to the IAM user or role to grant permission. These managed policies also give access to other required services. You can create your custom-managed policy if you want to customize permissions for your end-users.

Policy Description
AmazonRedshiftQueryEditorV2FullAccess Grants full access to Query Editor v2 operations and resources. This is primarily intended for administrators.
AmazonRedshiftQueryEditorV2NoSharing Grants the ability to work with Query Editor v2 without sharing resources. Users can’t share their queries with their team members.
AmazonRedshiftQueryEditorV2ReadSharing Grants the ability to work with Query Editor v2 with limited sharing of resources. The granted principal can read the saved queries shared with its team but can’t update them.
AmazonRedshiftQueryEditorV2ReadWriteSharing Grants the ability to work with Query Editor v2 with sharing of resources. The granted principal can read and update the shared resources with its team.

For example, if you have a group of users as a part of marketing_group, and you want them to collaborate between themselves by sharing their queries, you can create an IAM role for them and assign the AmazonRedshiftQueryEditorV2ReadSharing policy. You can also tag the role with sqlworkbench-team as marketing_group.

You can use the IAM console to attach IAM policies to an IAM user or an IAM role. After you attach a policy to a role, you can attach the role to an IAM user.

To attach the IAM policies to an IAM role, complete the following steps:

  1. On the IAM console, choose Roles.
  2. Choose the role that needs access to Query Editor v2. Assume the name of the role as marketing_role.
  3. Choose Attach policies.
  4. For Policy names, choose the policies that we described previously based on your requirement.
  5. Choose Attach policy.

Now you can add the marketing_group tag for an IAM role.

  1. In the navigation pane, choose Roles and select the name of the role that you want to edit.
  2. Choose the Tags tab and choose Add tags.
  3. Add the tag key sqlworkbench-team and the value marketing_group.
  4. Choose Save changes.

Now the end-users with marketing_role can access Query Editor v2 with limited sharing of resources.

Work with Query Editor v2

You can use Query Editor v2 to author and run queries, visualize results, and share your work with your team. With Query Editor v2, you can create databases, schemas, tables, and user-defined functions (UDFs) with visual wizards. In a tree-view panel, for each of your clusters, you can view its schemas. For each schema, you can view its tables, views, functions (UDFs), and stored procedures.

Open Query Editor v2

After you log in to the console and navigate to Query Editor v2, you see a page like the following screenshot.

Query Editor v2 now provides a more IDE-like experience to its users and offers both dark and light themes. You can switch between themes by choosing the moon icon at the lower left of the page.

The left navigation pane shows the list of clusters that you have access to. If you don’t have an Amazon Redshift cluster, use the Getting Started with Amazon Redshift cluster with sample data option. In this post, we use the sample data (Tickets database) as examples.

Connect to an Amazon Redshift database

You can connect to a cluster by choosing a cluster and entering your credentials.

You can connect using either a database user name and password or temporary credentials. Query Editor v2 creates a secret on your behalf stored in AWS Secrets Manager. This secret contains credentials to connect to your database. With temporary credentials, Query Editor v2 generates a temporary password to connect to the database.

Browse a database

You can browse one or more databases in the cluster that you’re connected to. Within a database, you can manage schemas, tables, views, functions, and stored procedures in the tree-view panel. If you have integrated your cluster with the AWS Glue Data Catalog, you see the Data Catalog schema and external tables. Similarly, you can browse the external tables if you create external schemas using Amazon Redshift data sharing, Amazon Redshift Spectrum, or federated queries.

You can perform an operation on an object choosing it (right-click) and choosing from the menu options.

Author and run queries

Query Editor v2 allows you to run your queries by selecting a specific database. If you have multiple databases, make sure that you choose the correct database.

You can enter a query in the editor or select a saved query from the Queries list and choose Run. The query editor provides several shortcuts for using with your query editor, and you can access that by choosing the content assist option.

By default, Limit 100 is set to limit the results to 100 rows. You can turn off this option to return a more extensive result set. If you turn off this option, you can include the LIMIT option in your SQL statement to avoid very large result sets.

Use multiple SQL statements in a query

The query editor supports multiple queries, session variables, and temporary tables. If you have multiple SQL statements and you run the query, the results are displayed on various tabs.

Run long queries

You don’t have to wait for long queries to complete to view results. The queries run even if the browser window is closed. You can view the results the next time you log in to Query Editor v2.

Run parameterized queries

You can use parameters with your query instead of hardcoding certain values, as in the following code:

SELECT sum(qtysold) 
FROM   sales, date 
WHERE  sales.dateid = date.dateid 
AND    sellerId >= ${sellerid};

When you run a query with a parameter, you’re prompted with a form.

Run the explain plan

You can optimize your queries by turning on the Explain option to display a query plan in the results area. You can choose Save to save the query to the Queries folder.

Export results

You can export the query results on the current page to a file in JSON or CSV format. To save the file in the format you want, open the context menu (right-click) in the results area, then choose Export current page and either JSON or CSV. You can also select rows and export the results for specific rows.

Visual analysis of your results

You can perform a visual analysis of your results for a query by turning on Chart to display a graphic visualization of the results. Choose Traces to display the results as a chart. For Type, choose the style of chart as Bar, Line, and so on. For Orientation, you can choose Vertical or Horizontal. For X, select the table column that you want to use for the horizontal axis. For Y, choose the table column that you want to use for the vertical axis.

Choose Refresh to update the chart display. Choose Fullscreen to expand the chart display.

To create a chart, complete the following steps:

  1. Run a query and get results.
  2. Turn on Chart.
  3. Choose a chart style from the available options.

  1. Choose Trace and start to visualize your data.
  2. Choose Style to customize the appearance, including colors, axes, legend, and annotations.
  3. Choose Annotations to add text, shapes, and images.

For certain chart types, you can add transforms to filter, spilt, aggregate, and sort the underlying data for the chart.

You can also save, export, and browse the charts you created.

Collaborate and share with your team members

You can share queries with others on your team. As we discussed earlier, an administrator sets up a team based on the IAM policy associated with an IAM user or IAM role. For example, if you’re a member of marketing_group, you can share your queries with your team members.

Save, organize and browse queries

Before you can share your query with your team, save your query. You can also view and delete saved queries.

To save your query, choose Save, enter a title, and choose Save again.

To browse for saved queries, choose Queries from the navigation pane. You can view queries that are My queries, Shared by me, or Shared to my team. These queries can appear as individual queries or within folders you created.

Organize your queries with folders

You can organize your queries by creating folders and dragging and dropping a saved query to a folder.

Share a query

You can share your queries with your team.

  1. Choose Queries in the navigation pane.
  2. Open the context menu (right-click) of the query that you want to share.
  3. Choose Share with my team.

Manage query versions

You can also view the history of saved queries and manage query versions. Every time you save an SQL query, Query Editor v2 saves it as a new version. You can view or store 20 different versions of your query and browse earlier query versions, save a copy of a query, or restore a query.

  1. Choose Queries in the navigation pane.
  2. Open the context menu (right-click) for the query that you want to work with.
  3. Choose Version history to open a list of versions of the query.
  4. On the Version history page, choose one of the following options:
    • Revert to selected – Revert to the selected version and continue your work with this version.
    • Save selected as – Create a new query in the editor.

Conclusion

In this post, we introduced you to Amazon Redshift Query Editor v2, which has a rich set of features to manage and run your SQL statements securely that provide you with several capabilities, such as ability to browse and explore multiple databases, external tables, views, stored procedures, and user-defined functions. It provides wizards to create schemas, tables, and user-defined functions. Query Editor v2 simplifies management and collaboration of saved queries and improves the ability to analyze and visualize results with a single click.

If you have any questions or suggestions, please leave a comment.

Happy querying!


About the Author

Srikanth Sopirala is a Principal Analytics Specialist Solutions Architect at AWS. He is a seasoned leader with over 20 years of experience, who is passionate about helping customers build scalable data and analytics solutions to gain timely insights and make critical business decisions. In his spare time, he enjoys reading, spending time with his family, and road cycling.

Debu Panda, a Principal Product Manager at AWS, is an industry leader in analytics, application platform, and database technologies, and has more than 25 years of experience in the IT world. Debu has published numerous articles on analytics, enterprise Java, and databases and has presented at multiple conferences such as re:Invent, Oracle Open World, and Java One. He is lead author of the EJB 3 in Action (Manning Publications 2007, 2014) and Middleware Management (Packt).

Eren Baydemir, a Technical Product Manager at AWS, has 15 years of experience in building customer facing products and is currently creating data analytics solutions in the Amazon Redshift team. He was the CEO and co-founder of DataRow which was acquired by Amazon in 2020.

Erol Murtezaoglu, a Technical Product Manager at AWS, is an inquisitive and enthusiastic thinker with a drive for self improvement and learning. He has a strong and proven technical background in software development and architecture, balanced with a drive to deliver commercially successful products. Erol highly values the process of understanding customer needs and problems, in order to deliver solutions that exceed expectations.

Introducing Amazon Redshift Query Editor V2, a Free Web-based Query Authoring Tool for Data Analysts

Post Syndicated from Alex Casalboni original https://aws.amazon.com/blogs/aws/amazon-redshift-query-editor-v2-web-query-authoring/

When it comes to manipulating and analyzing relational data, Structured Query Language (SQL) has been an international standard since 1986, a couple of years before I was born. And yet, it sometimes takes hours to get access to a new database or data warehouse, configure credentials or single sign-on, download and install multiple desktop libraries or drivers, and get familiar with the new schema—all this before you even run a query. Not to mention the challenge of sharing queries, results, and analyses securely between members of the same team or across teams.

Today, I’m glad to announce the general availability of Amazon Redshift Query Editor V2, a web-based tool that you can use to explore, analyze, and share data using SQL. It allows you to explore, analyze, share, and collaborate on data stored on Amazon Redshift. It supports data warehouses on Amazon Redshift and data lakes through Amazon Redshift Spectrum.

Amazon Redshift Query Editor V2 provides a free serverless web interface that reduces the operational costs of managing query tools and infrastructure. Because it’s a managed SQL editor in your browser and it’s integrated with your single sign-on provider, the Query Editor V2 reduces the number of steps to the first query so you gain insights faster. You also get in-place visual analysis of query results (no data download required), all in one place. As an additional team productivity boost, it improves collaboration with saved queries and the ability to share results and analyses between users.

From a security standpoint, analysts can access Query Editor V2 without requiring any admin privileges on the Amazon Redshift cluster, using an IAM role for READ, WRITE, or ADMIN access. Check out the documentation for more details.

Connection Setup for Amazon Redshift Query Editor V2
First, you’ll need to configure the connection to your Amazon Redshift cluster.

After you have configured the connection, you can reuse it for future sessions. And, of course, you can edit or delete a connection at any time.

Simply click on a cluster to connect with Query Editor V2.

Amazon Redshift Query Editor V2 in Action
The web interface allows you to browse schemas, tables, views, functions, and stored procedures. You can also preview a table’s columns with one click and create or delete schemas, tables, or functions.

The interface is intuitive for newcomers and expert users alike. You can resize panels, create tabs, and configure your editor preferences.

Running or explaining a query is quite straightforward: You simply write (or paste) the query and choose Run. You can visualize and interact with the result set in the bottom pane. For example, you might want to change the row ordering or search for a specific word. Even though Amazon Redshift Query Editor V2 is a browser-based tool, the data movement between your browser and the Amazon Redshift cluster is optimized, so your browser doesn’t need to download any raw data. A lot of the filtering and reordering happens directly in the browser, without any wait time.

To export a result set as a JSON or CSV file on your local machine, simply right-click it.

So far so good! Running queries is the minimum you’d expect from a Query Editor. Let’s have a look at some of the more interesting features.

Team Collaboration with Amazon Redshift Query Editor V2
Amazon Redshift Query Editor V2 allows you to manage the permissions of your team members based on their IAM roles, so that you can easily share queries and cluster access in a secure way.

For example, you can use IAM managed policies such as AmazonRedshiftQueryEditorV2FullAccess, AmazonRedshiftQueryEditorV2ReadSharing, or AmazonRedshiftQueryEditorV2ReadWriteSharing. Also, don’t forget to include the redshift:GetClusterCredentials permission.

After you’ve set up the IAM roles for your team, choose Save to save a query.

The Untitled tab will show the query name. From now on, you edit this saved query to make updates and then choose Save again.

Individual users with WRITE access can run, edit, and delete shared queries, while users with READ access can only run shared queries.

If you work on multiple projects and collaborate with many different teams, it might be difficult to remember query names or even find them in a long list. In Amazon Redshift Query Editor V2, saved and shared queries are available from the left navigation in Queries. You can keep your queries organized into folders. Even nested folders are supported.

Last but not least, each saved query is versioned and the version history is always available. That’s pretty useful when you need to restore an older version.

Plot Your Queries with Amazon Redshift Query Editor V2
Sharing queries with teammates is great, but wouldn’t it even better if you could visualize a result set, export it as PNG or JPEG, and save the chart for later? Amazon Redshift Query Editor V2 allows you to perform in-place visualizations of your results. When you’re happy with the look and feel of your chart, you can save it for later and organize all your saved charts into folders. This allows you to simply choose a saved chart, rerun the corresponding query, and export the new image. No need to configure the plot from scratch or remember the configuration of hundreds of charts and queries across different projects.

Available Today
Amazon Redshift Query Editor V2 is available today in all commercial AWS Regions, except AP-Northeast-3 regions. It requires no license and it’s free, except for the cost for your Amazon Redshift cluster.

You can interact with the service using the Amazon Redshift console. It doesn’t require any driver or software on your local machine.

For more information, see the Amazon Redshift Query Editor V2 technical documentation or take a look at this video:

We look forward to your feedback.

Alex

Integral Ad Science secures self-service data lake using AWS Lake Formation

Post Syndicated from Mat Sharpe original https://aws.amazon.com/blogs/big-data/integral-ad-science-secures-self-service-data-lake-using-aws-lake-formation/

This post is co-written with Mat Sharpe, Technical Lead, AWS & Systems Engineering from Integral Ad Science.

Integral Ad Science (IAS) is a global leader in digital media quality. The company’s mission is to be the global benchmark for trust and transparency in digital media quality for the world’s leading brands, publishers, and platforms. IAS does this through data-driven technologies with actionable real-time signals and insight.

In this post, we discuss how IAS uses AWS Lake Formation and Amazon Athena to efficiently manage governance and security of data.

The challenge

IAS processes over 100 billion web transactions per day. With strong growth and changing seasonality, IAS needed a solution to reduce cost, eliminate idle capacity during low utilization periods, and maximize data processing speeds during peaks to ensure timely insights for customers.

In 2020, IAS deployed a data lake in AWS, storing data in Amazon Simple Storage Service (Amazon S3), cataloging its metadata in the AWS Glue Data Catalog, ingesting and processing using Amazon EMR, and using Athena to query and analyze the data. IAS wanted to create a unified data platform to meet its business requirements. Additionally, IAS wanted to enable self-service analytics for customers and users across multiple business units, while maintaining critical controls over data privacy and compliance with regulations such as GDPR and CCPA. To accomplish this, IAS needed to securely ingest and organize real-time and batch datasets, as well as secure and govern sensitive customer data.

To meet the dynamic nature of IAS’s data and use cases, the team needed a solution that could define access controls by attribute, such as classification of data and job function. IAS processes significant volumes of data and this continues to grow. To support the volume of data, IAS needed the governance solution to scale in order to create and secure many new daily datasets. This meant IAS could enable self-service access to data from different tools, such as development notebooks, the AWS Management Console, and business intelligence and query tools.

To address these needs, IAS evaluated several approaches, including a manual ticket-based onboarding process to define permissions on new datasets, many different AWS Identity and Access Management (IAM) policies, and an AWS Lambda based approach to automate defining Lake Formation table and column permissions triggered by changes in security requirements and the arrival of new datasets.

Although these approaches worked, they were complex and didn’t support the self-service experience that IAS data analysts required.

Solution overview

IAS selected Lake Formation, Athena, and Okta to solve this challenge. The following architectural diagram shows how the company chose to secure its data lake.

The solution needed to support data producers and consumers in multiple AWS accounts. For brevity, this diagram shows a central data lake producer that includes a set of S3 buckets for raw and processed data. Amazon EMR is used to ingest and process the data, and all metadata is cataloged in the data catalog. The data lake consumer account uses Lake Formation to define fine-grained permissions on datasets shared by the producer account; users logging in through Okta can run queries using Athena and be authorized by Lake Formation.

Lake Formation enables column-level control, and all Amazon S3 access is provisioned via a Lake Formation data access role in the query account, ensuring only that service can access the data. Each business unit with access to the data lake is provisioned with an IAM role that only allows limited access to:

  • That business unit’s Athena workgroup
  • That workgroup’s query output bucket
  • The lakeformation:GetDataAccess API

Because Lake Formation manages all the data access and permissions, the configuration of the user’s role policy in IAM becomes very straightforward. By defining an Athena workgroup per business unit, IAS also takes advantage of assigning per-department billing tags and query limits to help with cost management.

Define a tag strategy

IAS commonly deals with two types of data: data generated by the company and data from third parties. The latter usually includes contractual stipulations on privacy and use.

Some data sets require even tighter controls, and defining a tag strategy is one key way that IAS ensures compliance with data privacy standards. With the tag-based access controls in Lake Formation IAS can define a set of tags within an ontology that is assigned to tables and columns. This ensures users understand available data and whether or not they have access. It also helps IAS manage privacy permissions across numerous tables with new ones added every day.

At a simplistic level, we can define policy tags for class with private and non-private, and for owner with internal and partner.

As we progressed, our tagging ontology evolved to include individual data owners and data sources within our product portfolio.

Apply tags to data assets

After IAS defined the tag ontology, the team applied tags at the database, table, and column level to manage permissions. Tags are inherited, so they only need to be applied at the highest level. For example, IAS applied the owner and class tags at the database level and relied on inheritance to propagate the tags to all the underlying tables and columns. The following diagram shows how IAS activated a tagging strategy to distinguish between internal and partner datasets , while classifying sensitive information within these datasets.

Only a small number of columns contain sensitive information; IAS relied on inheritance to apply a non-private tag to the majority of the database objects and then overrode it with a private tag on a per-column basis.

The following screenshot shows the tags applied to a database on the Lake Formation console.

With its global scale, IAS needed a way to automate how tags are applied to datasets. The team experimented with various options including string matching on column names, but the results were unpredictable in situations where unexpected column names are used (ipaddress vs. ip_address, for example). Ultimately, IAS incorporated metadata tagging into its existing infrastructure as code (IaC) process, which gets applied as part of infrastructure updates.

Define fine-grained permissions

The final piece of the puzzle was to define permission rules to associate with tagged resources. The initial data lake deployment involved creating permission rules for every database and table, with column exclusions as necessary. Although these were generated programmatically, it added significant complexity when the team needed to troubleshoot access issues. With Lake Formation tag-based access controls, IAS reduced hundreds of permission rules down to precisely two rules, as shown in the following screenshot.

When using multiple tags, the expressions are logically ANDed together. The preceding statements permit access only to data tagged non-private and owned by internal.

Tags allowed IAS to simplify permission rules, making it easy to understand, troubleshoot, and audit access. The ability to easily audit which datasets include sensitive information and who within the organization has access to them made it easy to comply with data privacy regulations.

Benefits

This solution provides self-service analytics to IAS data engineers, analysts, and data scientists. Internal users can query the data lake with their choice of tools, such as Athena, while maintaining strong governance and auditing. The new approach using Lake Formation tag-based access controls reduces the integration code and manual controls required. The solution provides the following additional benefits:

  • Meets security requirements by providing column-level controls for data
  • Significantly reduces permission complexity
  • Reduces time to audit data security and troubleshoot permissions
  • Deploys data classification using existing IaC processes
  • Reduces the time it takes to onboard data users including engineers, analysts, and scientists

Conclusion

When IAS started this journey, the company was looking for a fully managed solution that would enable self-service analytics while meeting stringent data access policies. Lake Formation provided IAS with the capabilities needed to deliver on this promise for its employees. With tag-based access controls, IAS optimized the solution by reducing the number of permission rules from hundreds down to a few, making it even easier to manage and audit. IAS continues to analyze data using more tools governed by Lake Formation.


About the Authors

Mat Sharpe is the Technical Lead, AWS & Systems Engineering at IAS where he is responsible for the company’s AWS infrastructure and guiding the technical teams in their cloud journey. He is based in New York.

Brian Maguire is a Solution Architect at Amazon Web Services, where he is focused on helping customers build their ideas in the cloud. He is a technologist, writer, teacher, and student who loves learning. Brian is the co-author of the book Scalable Data Streaming with Amazon Kinesis.

Danny Gagne is a Solutions Architect at Amazon Web Services. He has extensive experience in the design and implementation of large-scale high-performance analysis systems, and is the co-author of the book Scalable Data Streaming with Amazon Kinesis. He lives in New York City.

Accelerate your data warehouse migration to Amazon Redshift – Part 4

Post Syndicated from Michael Soo original https://aws.amazon.com/blogs/big-data/part-4-accelerate-your-data-warehouse-migration-to-amazon-redshift/

This is the fourth in a series of posts. We’re excited to share dozens of new features to automate your schema conversion; preserve your investment in existing scripts, reports, and applications; accelerate query performance; and potentially reduce your overall cost to migrate to Amazon Redshift.

Check out the previous posts in the series:

Amazon Redshift is the leading cloud data warehouse. No other data warehouse makes it as easy to gain new insights from your data. With Amazon Redshift, you can query exabytes of data across your data warehouse, operational data stores, and data lake using standard SQL. You can also integrate other AWS services like Amazon EMR, Amazon Athena, Amazon SageMaker, AWS Glue, AWS Lake Formation, and Amazon Kinesis to use all the analytic capabilities in the AWS Cloud.

Many customers have asked for help migrating from self-managed data warehouse engines, like Teradata, to Amazon Redshift. In these cases, you typically have terabytes or petabytes of data, a heavy reliance on proprietary features, and thousands of extract, transform, and load (ETL) processes and reports built over a few years (or decades) of use.

Until now, migrating a data warehouse to AWS was complex and involved a significant amount of manual effort. You needed to manually remediate syntax differences, inject code to replace proprietary features, and manually tune the performance of queries and reports on the new platform.

For example, you may have a significant investment in BTEQ (Basic Teradata Query) scripting for database automation, ETL, or other tasks. Previously, you needed to manually recode these scripts as part of the conversion process to Amazon Redshift. Together with supporting infrastructure (job scheduling, job logging, error handling), this was a significant impediment to migration.

Today, we’re happy to share with you a new, purpose-built command line tool called Amazon Redshift RSQL. Some of the key features added in Amazon Redshift RSQL are enhanced flow control syntax and single sign-on support. You can also describe properties or attributes of external tables in an AWS Glue catalog or Apache Hive Metastore, external databases in Amazon RDS for PostgreSQL or Amazon Aurora PostgreSQL-Compatible Edition, and tables shared using Amazon Redshift data sharing.

We have also enhanced the AWS Schema Conversion Tool (AWS SCT) to automatically convert BTEQ scripts to Amazon Redshift RSQL scripts. The converted scripts run on Amazon Redshift with little to no changes.

In this post, we describe some of the features of Amazon Redshift RSQL, show example scripts, and demonstrate how to convert BTEQ scripts into Amazon Redshift RSQL scripts.

Amazon Redshift RSQL features

If you currently use Amazon Redshift, you may already be running scripts on Amazon Redshift using the PSQL command line client. These scripts operate on Amazon Redshift RSQL with no modification. You can think of Amazon Redshift RSQL as an Amazon Redshift-native version of PSQL.

In addition, we have designed Amazon Redshift RSQL to make it easy to transition BTEQ scripts to the tool. The following are some examples of Amazon Redshift RSQL commands that make this possible. (For full details, see Amazon Redshift RSQL.)

  • \EXIT – This command is an extension of the PSQL \quit command. Like \quit, \EXIT terminates the execution of Amazon Redshift RSQL. In addition, you can specify an optional exit code with \EXIT.
  • \LOGON – This command creates a new connection to a database. \LOGON is an alias for the PSQL \connect command. You can specify connection parameters using positional syntax or as a connection string.
  • \REMARK – This command prints the specified string to the output. \REMARK extends the PSQL \echo command by adding the ability to break the output over multiple lines, using // as a line break.
  • \RUN – This command runs the Amazon Redshift RSQL script contained in the specified file. \RUN extends the PSQL \i command by adding an option to skip any number of lines in the specified file.
  • \OS – This is an alias for the PSQL \! command. \OS runs the operating system command that is passed as a parameter. Control returns to Amazon Redshift RSQL after running the OS command.
  • \LABEL – This is a new command for Amazon Redshift RSQL. \LABEL establishes an entry point for execution, as the target for a \GOTO command.
  • \GOTO – This command is a new command for Amazon Redshift RSQL. It’s used in conjunction with the \LABEL command. \GOTO skips all intervening commands and resumes processing at the specified \LABEL. The \LABEL must be a forward reference. You can’t jump to a \LABEL that lexically precedes the \GOTO.
  • \IF (\ELSEIF, \ELSE, \ENDIF) – This command is an extension of the PSQL \if (\elif, \else, \endif) command. \IF and \ELSEIF support arbitrary Boolean expressions including AND, OR, and NOT conditions. You can use the \GOTO command within a \IF block to control conditional execution.
  • \EXPORT – This command specifies the name of an export file that Amazon Redshift RSQL uses to store database information returned by a subsequent SQL SELECT statement.

We’ve also added some variables to Amazon Redshift RSQL to support converting your BTEQ scripts.

  • :ACTIVITYCOUNT – This variable returns the number of rows affected by the last submitted request. For a data-returning request, this is the number of rows returned to Amazon Redshift RSQL from the database. ACTIVITYCOUNT is similar to the PSQL variable ROW_COUNT, however, ROW_COUNT does not report affected-row count for SELECT, COPY or UNLOAD.
  • :ERRORCODE – This variable contains the return code for the last submitted request to the database. A zero signifies the request completed without error. The ERRORCODE variable is an alias for the variable SQLSTATE.
  • :ERRORLEVEL – This variable assigns severity levels to errors. Use the severity levels to determine a course of action based on the severity of the errors that Amazon Redshift RSQL encounters.
  • :MAXERROR – This variable designates a maximum error severity level beyond which Amazon Redshift RSQL terminates job processing.

An example Amazon Redshift RSQL script

Let’s look at an example. First, we log in to an Amazon Redshift database using Amazon Redshift RSQL. You specify the connection information on the command line as shown in the following code. The port and database are optional and default to 5439 and dev respectively if not provided.

$ rsql -h testcluster1.<example>.redshift.amazonaws.com -U testuser1 -d myredshift -p 5439
Password for user testuser1: 
DSN-less Connected
DBMS Name: Amazon Redshift
Driver Name: Amazon Redshift ODBC Driver
Driver Version: 1.4.34.1000
Rsql Version: 1.0.1
Redshift Version: 1.0.29551
Type "help" for help.

(testcluster1) [email protected]=#

If you choose to change the connection from within the client, you can use the \LOGON command:

(testcluster1) [email protected]=# \logon testschema testuser2 testcluster2.<example>.redshift.amazonaws.com
Password for user testuser2: 
DBMS Name: Amazon Redshift
Driver Name: Amazon Redshift ODBC Driver
Driver Version: 1.4.34.1000
Rsql Version: 1.0.1

Now, let’s run a simple script that runs a SELECT statement, checks for output, then branches depending on whether data was returned or not.

First, we inspect the script by using the \OS command to print the file to the screen:

(testcluster1) [email protected]=# \os cat activitycount.sql
select * from testschema.employees;

\if :ACTIVITYCOUNT = 0
  \remark '****No data found****'
  \goto LETSQUIT
\else
  \remark '****Data found****'
  \goto LETSDOSOMETHING
\endif

\label LETSQUIT
\remark '****We are quitting****'
\exit 0

\label LETSDOSOMETHING
\remark '****We are doing it****'
\exit 0

The script prints one of two messages depending on whether data is returned by the SELECT statement or not.

Now, let’s run the script using the \RUN command. The SELECT statement returns 11 rows of data. The script prints a “data found” message, and jumps to the LETSDOSOMETHING label.

(testcluster1) [email protected]=# \run file=activitycount.sql
  id  | name    | manager_id | last_promo_date
 -----+---------+------------+-----------------
 112  | Britney | 201        | 2041-03-30
 101  | Bob     | 100        |
 110  | Mark    | 201        |
 106  | Jeff    | 102        |
 201  | Ana     | 104        |
 104  | Chris   | 103        |
 111  | Phyllis | 103        |
 102  | Renee   | 101        | 2021-01-01
 100  | Caitlin |            | 2021-01-01
 105  | David   | 103        | 2021-01-01
 103  | John    | 101        |
 (11 rows)

****Data found****
\label LETSQUIT ignored
\label LETSDOSOMETHING processed
****We are doing it****

That’s Amazon Redshift RSQL in a nutshell. If you’re developing new scripts for Amazon Redshift, we encourage you to use Amazon Redshift RSQL and take advantage of its additional capabilities. If you have existing PSQL scripts, you can run those scripts using Amazon Redshift RSQL with no changes.

Use AWS SCT to automate your BTEQ conversions

If you’re a Teradata developer or DBA, you’ve probably built a library of BTEQ scripts that you use to perform administrative work, load or transform data, or to generate datasets and reports. If you’re contemplating a migration to Amazon Redshift, you’ll want to preserve the investment you made in creating those scripts.

AWS SCT has long had the ability to convert BTEQ to AWS Glue. Now, you can also use AWS SCT to automatically convert BTEQ scripts to Amazon Redshift RSQL. AWS SCT supports all the new Amazon Redshift RSQL features like conditional execution, escape to the shell, and branching.

Let’s see how it works. We create two Teradata tables, product_stg and product. Then we create a simple ETL script that uses a MERGE statement to update the product table using data from the product_stg table:

CREATE TABLE testschema.product_stg (
  prod_id INTEGER
, description VARCHAR(100) CHARACTER SET LATIN
,category_id INTEGER)
UNIQUE PRIMARY INDEX ( prod_id );

CREATE TABLE testschema.product (
  prod_id INTEGER
, description VARCHAR(100) CHARACTER SET LATIN
, category_id INTEGER)
UNIQUE PRIMARY INDEX ( prod_id );

We embed the MERGE statement inside a BTEQ script. The script tests error conditions and branches accordingly:

.SET WIDTH 100

SELECT COUNT(*) 
FROM testschema.product_stg 
HAVING COUNT(*) > 0;

.IF ACTIVITYCOUNT = 0 then .GOTO NODATA;

MERGE INTO testschema.product tgt 
USING testschema.product_stg stg 
   ON tgt.prod_id = stg.prod_id
WHEN MATCHED THEN UPDATE SET
      description = stg.description
    , category_id = stg.category_id
WHEN NOT MATCHED THEN INSERT VALUES (
  stg.prod_id
, stg.description
, stg.category_id
);

.GOTO ALLDONE;

.LABEL NODATA

.REMARK 'Staging table is empty. Stopping'

.LABEL ALLDONE 

.QUIT;               

Now, let’s use AWS SCT to convert the script to Amazon Redshift RSQL. AWS SCT converts the BTEQ commands to their Amazon Redshift RSQL and Amazon Redshift equivalents. The converted script is as follows:

\rset width 100
SELECT
    COUNT(*)
    FROM testschema.product_stg
    HAVING COUNT(*) > 0;
\if :ACTIVITYCOUNT = 0
    \goto NODATA
\endif
UPDATE testschema.product
SET description = stg.description, category_id = stg.category_id
FROM testschema.product_stg AS stg
JOIN testschema.product AS tgt
    ON tgt.prod_id = stg.prod_id;
INSERT INTO testschema.product
SELECT
    stg.prod_id, stg.description, stg.category_id
    FROM testschema.product_stg AS stg
    WHERE NOT EXISTS (
        SELECT 1
        FROM testschema.product AS tgt
        WHERE tgt.prod_id = stg.prod_id);
\goto ALLDONE
\label NODATA
\remark 'Staging table is empty. Stopping'
\label ALLDONE
\quit :ERRORLEVEL

The following are the main points of interest in the conversion:

  • The BTEQ .SET WIDTH command is converted to the Amazon Redshift RSQL \RSET WIDTH command.
  • The BTEQ ACTIVITYCOUNT variable is converted to the Amazon Redshift RSQL ACTIVITYCOUNT variable.
  • The BTEQ MERGE statement is converted into an UPDATE followed by an INSERT statement. Currently, Amazon Redshift doesn’t support a native MERGE statement.
  • The BTEQ .LABEL and .GOTO statements are translated to their Amazon Redshift RSQL equivalents \LABEL and \GOTO.

Let’s look at the actual process of using AWS SCT to convert a BTEQ script.

After starting AWS SCT, you create a Teradata migration project and navigate to the BTEQ scripts node in the source tree window pane. Right-click and choose Load scripts.

Then select the folder that contains your BTEQ scripts. The folder appears in the source tree. Open it and navigate to the script you want to convert. In our case, the script is contained in the file merge.sql. Right-click on the file, choose Convert script, then choose Convert to RSQL.You can inspect the converted script in the bottom middle pane. When you’re ready to save the script to a file, do that from the target tree on the right side.

If you have many BTEQ scripts, you can convert an entire folder at once by selecting the folder instead of an individual file.

Convert shell scripts

Many applications run BTEQ commands from within shell scripts. For example, you may have a shell script that redirects log output and controls login credentials, as in the following:

bteq <<EOF >> ${LOG} 2>&1

.run file $LOGON;

SELECT COUNT(*) 
FROM testschema.product_stg 
HAVING COUNT(*) > 0;
…

EOF

If you use shell scripts to run BTEQ, we’re happy to share that AWS SCT can help you convert those scripts. AWS SCT supports bash scripts now, and we’ll add additional shell dialects in the future.

The process to convert shell scripts is very similar to BTEQ conversion. You select a folder that contains your scripts by navigating to the Shell node in the source tree and then choosing Load scripts.

After the folder is loaded, you can convert one (or more) scripts by selecting them and choosing Convert script.

As before, the converted script appears in the UI, and you can save it from the target tree on the right side of the page.

Conclusion

We’re happy to share Amazon Redshift RSQL and expect it to be a big hit with customers. If you’re contemplating a migration from Teradata to Amazon Redshift, Amazon Redshift RSQL and AWS SCT can simplify the conversion of your existing Teradata scripts and help preserve your investment in existing reports, applications, and ETL.

All of the features described in this post are available for you to use today. You can download Amazon Redshift RSQL and AWS SCT and give it a try.

We’ll be back soon with the next installment in this series. Check back for more information on automating your migrations from Teradata to Amazon Redshift. In the meantime, you can learn more about Amazon Redshift, Amazon Redshift RSQL, and AWS SCT. Happy migrating!


About the Authors

Michael Soo is a Senior Database Engineer with the AWS Database Migration Service team. He builds products and services that help customers migrate their database workloads to the AWS cloud.

Po Hong, PhD, is a Principal Data Architect of Lake House Global Specialty Practice,
AWS Professional Services. He is passionate about supporting customers to adopt innovative solutions to reduce time to insight. Po is specialized in migrating large scale MPP on-premises data warehouses to the AWS Lake House architecture.

Entong Shen is a Software Development Manager of Amazon Redshift. He has been working on MPP databases for over 9 years and has focused on query optimization, statistics and migration related SQL language features such as stored procedures and data types.

Adekunle Adedotun is a Sr. Database Engineer with Amazon Redshift service. He has been working on MPP databases for 6 years with a focus on performance tuning. He also provides guidance to the development team for new and existing service features.

Asia Khytun is a Software Development Manager for the AWS Schema Conversion Tool. She has 10+ years of software development experience in C, C++, and Java.

Illia Kratsov is a Database Developer with the AWS Project Delta Migration team. He has 10+ years experience in data warehouse development with Teradata and other MPP databases.

Accelerate your data warehouse migration to Amazon Redshift – Part 3

Post Syndicated from Michael Soo original https://aws.amazon.com/blogs/big-data/part-3-accelerate-your-data-warehouse-migration-to-amazon-redshift/

This is the third post in a multi-part series. We’re excited to share dozens of new features to automate your schema conversion; preserve your investment in existing scripts, reports, and applications; accelerate query performance; and reduce your overall cost to migrate to Amazon Redshift.

Check out the previous posts in the series:

Amazon Redshift is the leading cloud data warehouse. No other data warehouse makes it as easy to gain new insights from your data. With Amazon Redshift, you can query exabytes of data across your data warehouse, operational data stores, and data lake using standard SQL. You can also integrate other services such as Amazon EMR, Amazon Athena, and Amazon SageMaker to use all the analytic capabilities in the AWS Cloud.

Many customers have asked for help migrating from self-managed data warehouse engines, like Teradata, to Amazon Redshift. In these cases, you may have terabytes (or petabytes) of historical data, a heavy reliance on proprietary features, and thousands of extract, transform, and load (ETL) processes and reports built over years (or decades) of use.

Until now, migrating a Teradata data warehouse to AWS was complex and involved a significant amount of manual effort.

Today, we’re happy to share recent enhancements to Amazon Redshift and the AWS Schema Conversion Tool (AWS SCT) that make it easier to automate your Teradata to Amazon Redshift migrations.

In this post, we introduce new automation for merge statements, a native function to support ASCII character conversion, enhanced error checking for string to date conversion, enhanced support for Teradata cursors and identity columns, automation for ANY and SOME predicates, automation for RESET WHEN clauses, automation for two proprietary Teradata functions (TD_NORMALIZE_OVERLAP and TD_UNPIVOT), and automation to support analytic functions (QUANTILE and QUALIFY).

Merge statement

Like its name implies, the merge statement takes an input set and merges it into a target table. If an input row already exists in the target table (a row in the target table has the same primary key value), then the target row is updated. If there is no matching target row, the input row is inserted into the table.

Until now, if you used merge statements in your workload, you were forced to manually rewrite the merge statement to run on Amazon Redshift. Now, we’re happy to share that AWS SCT automates this conversion for you. AWS SCT decomposes a merge statement into an update on existing records followed by an insert for new records.

Let’s look at an example. We create two tables in Teradata: a target table, employee, and a delta table, employee_delta, where we stage the input rows:

CREATE TABLE testschema.employee(
  id INTEGER
, name VARCHAR(20)
, manager INTEGER)
UNIQUE PRIMARY INDEX (id)
;

CREATE TABLE testschema.employee_delta (
  id INTEGER
, name VARCHAR(20)
, manager INTEGER)
UNIQUE PRIMARY INDEX(id)
;

Now we create a Teradata merge statement that updates a row if it exists in the target, otherwise it inserts the new row. We embed this merge statement into a macro so we can show you the conversion process later.

REPLACE MACRO testschema.merge_employees AS (
  MERGE INTO testschema.employee tgt
  USING testschema.employee_delta delta
    ON delta.id = tgt.id
  WHEN MATCHED THEN
    UPDATE SET name = delta.name, manager = delta.manager
  WHEN NOT MATCHED THEN
    INSERT (delta.id, delta.name, delta.manager);
);

Now we use AWS SCT to convert the macro. (See Accelerate your data warehouse migration to Amazon Redshift – Part 1 for details on macro conversion.) AWS SCT creates a stored procedure that contains an update (to implement the WHEN MATCHED condition) and an insert (to implement the WHEN NOT MATCHED condition).

CREATE OR REPLACE PROCEDURE testschema.merge_employees()
AS $BODY$
BEGIN
    UPDATE testschema.employee
    SET name = "delta".name, manager = "delta".manager
    FROM testschema.employee_delta AS delta JOIN testschema.employee AS tgt
        ON "delta".id = tgt.id;
      
    INSERT INTO testschema.employee
    SELECT
      "delta".id
    , "delta".name
    , "delta".manager
    FROM testschema.employee_delta AS delta
    WHERE NOT EXISTS (
      SELECT 1
      FROM testschema.employee AS tgt
      WHERE "delta".id = tgt.id
    );
END;
$BODY$
LANGUAGE plpgsql;

This example showed how to use merge automation for macros, but you can convert merge statements in any application context: stored procedures, BTEQ scripts, Java code, and more. Download the latest version of AWS SCT and try it out.

ASCII() function

The ASCII function takes as input a string and returns the ASCII code, or more precisely, the UNICODE code point, of the first character in the string. Previously, Amazon Redshift supported ASCII as a leader-node only function, which prevented its use with user-defined tables.

We’re happy to share that the ASCII function is now available on Amazon Redshift compute nodes and can be used with user-defined tables. In the following code, we create a table with some string data:

CREATE TABLE testschema.char_table (
  id INTEGER
, char_col  CHAR(10)
, varchar_col VARCHAR(10)
);

INSERT INTO testschema.char_table VALUES (1, 'Hello', 'world');

Now you can use the ASCII function on the string columns:

# SELECT id, char_col, ascii(char_col), varchar_col, ascii(varchar_col) FROM testschema.char_table;

 id |  char_col  | ascii | varchar_col | ascii 
  1 | Hello      |    72 | world       |   119

Lastly, if your application code uses the ASCII function, AWS SCT automatically converts any such function calls to Amazon Redshift.

The ASCII feature is available now—try it out in your own cluster.

TO_DATE() function

The TO_DATE function converts a character string into a DATE value. A quirk of this function is that it can accept a string value that isn’t a valid date and translate it into a valid date.

For example, consider the string 2021-06-31. This isn’t a valid date because the month of June has only 30 days. However, the TO_DATE function accepts this string and returns the “31st” day of June (July 1):

# SELECT to_date('2021-06-31', 'YYYY-MM-DD');
 to_date 
 2021-07-01
(1 row)

Customers have asked for strict input checking for TO_DATE, and we’re happy to share this new capability. Now, you can include a Boolean value in the function call that turns on strict checking:

# SELECT to_date('2021-06-31', 'YYYY-MM-DD', TRUE);
ERROR: date/time field date value out of range: 2021-6-31

You can turn off strict checking explicitly as well:

# SELECT to_date('2021-06-31', 'YYYY-MM-DD', FALSE);
 to_date 
 2021-07-01
(1 row)

Also, the Boolean value is optional. If you don’t include it, strict checking is turned off, and you see the same behavior as before the feature was launched.

You can learn more about the TO_DATE function and try out strict date checking in Amazon Redshift now.

CURSOR result sets

A cursor is a programming language construct that applications use to manipulate a result set one row at a time. Cursors are more relevant for OLTP applications, but some legacy applications built on data warehouses also use them.

Teradata provides a diverse set of cursor configurations. Amazon Redshift supports a more streamlined set of cursor features.

Based on customer feedback, we’ve added automation to support Teradata WITH RETURN cursors. These types of cursors are opened within stored procedures and returned to the caller for processing of the result set. AWS SCT will convert a WITH RETURN cursor to an Amazon Redshift REFCURSOR.

For example, consider the following procedure, which contains a WITH RETURN cursor. The procedure opens the cursor and returns the result to the caller as a DYNAMIC RESULT SET:

REPLACE PROCEDURE testschema.employee_cursor (IN p_mgrid INTEGER) DYNAMIC RESULT SETS 1
BEGIN
   DECLARE result_set CURSOR WITH RETURN ONLY FOR 
     SELECT id, name, manager 
     FROM testschema.employee
     WHERE manager = to_char(p_mgrid); 
   OPEN result_set;
END;

AWS SCT converts the procedure as follows. An additional parameter is added to the procedure signature to pass the REFCURSOR:

CREATE OR REPLACE PROCEDURE testschema.employee_cursor(par_p_mgrid IN INTEGER, dynamic_return_cursor INOUT refcursor)
AS $BODY$
DECLARE
BEGIN
    OPEN dynamic_return_cursor FOR
    SELECT
        id, name, manager
        FROM testschema.employee
        WHERE manager = to_char(par_p_mgrid, '99999');
END;
$BODY$
LANGUAGE plpgsql;

IDENTITY columns

Teradata supports several non-ANSI compliant features for IDENTITY columns. We have enhanced AWS SCT to automatically convert these features to Amazon Redshift, whenever possible.

Specifically, AWS SCT now converts the Teradata START WITH and INCREMENT BY clauses to the Amazon Redshift SEED and STEP clauses, respectively. For example, consider the following Teradata table:

CREATE TABLE testschema.identity_table (
  a2 BIGINT GENERATED ALWAYS AS IDENTITY (
    START WITH 1 
    INCREMENT BY 20
  )
);

The GENERATED ALWAYS clause indicates that the column is always populated automatically—a value can’t be explicitly inserted or updated into the column. The START WITH clause defines the first value to be inserted into the column, and the INCREMENT BY clause defines the next value to insert into the column.

When you convert this table using AWS SCT, the following Amazon Redshift DDL is produced. Notice that the START WITH and INCREMENT BY values are preserved in the target syntax:

CREATE TABLE IF NOT EXISTS testschema.identity_table (
  a2 BIGINT IDENTITY(1, 20)
)
DISTSTYLE KEY
DISTKEY
(a2)
SORTKEY
(a2);

Also, by default, an IDENTITY column in Amazon Redshift only contains auto-generated values, so that the GENERATED ALWAYS property in Teradata is preserved:

# INSERT INTO testschema.identity_table VALUES (100);
ERROR:  cannot set an identity column to a value

IDENTITY columns in Teradata can also be specified as GENERATED BY DEFAULT. In this case, a value can be explicitly defined in an INSERT statement. If no value is specified, the column is filled with an auto-generated value like normal. Before, AWS SCT didn’t support conversion for GENERATED BY DEFAULT columns. Now, we’re happy to share that AWS SCT automatically converts such columns for you.

For example, the following table contains an IDENTITY column that is GENERATED BY DEFAULT:

CREATE TABLE testschema.identity_by_default (
  a1 BIGINT GENERATED BY DEFAULT AS IDENTITY (
     START WITH 1 
     INCREMENT BY 20 
  )
PRIMARY INDEX (a1);

The IDENTITY column is converted by AWS SCT as follows. The converted column uses the Amazon Redshift GENERATED BY DEFAULT clause:

CREATE TABLE testschema.identity_by_default (
  a1 BIGINT GENERATED BY DEFAULT AS IDENTITY(1,20) DISTKEY
)                                                          
 DISTSTYLE KEY                                               
 SORTKEY (a1);

There is one additional syntax issue that requires attention. In Teradata, an auto-generated value is inserted when NULL is specified for the column value:

INSERT INTO identity_by_default VALUES (null);

Amazon Redshift uses a different syntax for the same purpose. Here, you include the keyword DEFAULT in the values list to indicate that the column should be auto-generated:

INSERT INTO testschema.identity_by_default VALUES (default);

We’re happy to share that AWS SCT automatically converts the Teradata syntax for INSERT statements like the preceding example. For example, consider the following Teradata macro:

REPLACE MACRO testschema.insert_identity_by_default AS (
  INSERT INTO testschema.identity_by_default VALUES (NULL);
);

AWS SCT removes the NULL and replaces it with DEFAULT:

CREATE OR REPLACE PROCEDURE testschema.insert_identity_by_default() LANGUAGE plpgsql
AS $$                                                              
BEGIN                                                              
  INSERT INTO testschema.identity_by_default VALUES (DEFAULT);
END;                                                               
$$                                                                 

IDENTITY column automation is available now in AWS SCT. You can download the latest version and try it out.

ANY and SOME filters with inequality predicates

The ANY and SOME filters determine if a predicate applies to one or more values in a list. For example, in Teradata, you can use <> ANY to find all employees who don’t work for a certain manager:

REPLACE MACRO testschema.not_in_103 AS (
  SELECT *
  FROM testschema.employee 
  WHERE manager <> ANY (103)
;
);

Of course, you can rewrite this query using a simple not equal filter, but you often see queries from third-party SQL generators that follow this pattern.

Amazon Redshift doesn’t support this syntax natively. Before, any queries using this syntax had to be manually converted. Now, we’re happy to share that AWS SCT automatically converts ANY and SOME clauses with inequality predicates. The macro above is converted to a stored procedure as follows.

CREATE OR REPLACE PROCEDURE testschema.not_in_103(macro_out INOUT refcursor)
AS $BODY$
BEGIN
    OPEN macro_out FOR
    SELECT *
    FROM testschema.employee
    WHERE ((manager <> 103));
END;
$BODY$
LANGUAGE plpgsql;

If the values list following the ANY contains two more values, AWS SCT will convert this to a series of OR conditions, one for each element in the list.

ANY/SOME filter conversion is available now in AWS SCT. You can try it out in the latest version of the application.

Analytic functions with RESET WHEN

RESET WHEN is a Teradata feature used in SQL analytical window functions. It’s an extension to the ANSI SQL standard. RESET WHEN determines the partition over which a SQL window function operates based on a specified condition. If the condition evaluates to true, a new dynamic sub-partition is created inside the existing window partition.

For example, the following view uses RESET WHEN to compute a running total by store. The running total accumulates as long as sales increase month over month. If sales drop from one month to the next, the running total resets.

CREATE TABLE testschema.sales (
  store_id INTEGER
, month_no INTEGER
, sales_amount DECIMAL(9,2)
)
;

REPLACE VIEW testschema.running_total (
  store_id
, month_no
, sales_amount
, cume_sales_amount
)
AS
SELECT 
  store_id
, month_no
, sales_amount
, SUM(sales_amount) OVER (
     PARTITION BY store_id 
     ORDER BY month_no
     RESET WHEN sales_amount < SUM(sales_amount) OVER (
       PARTITION BY store_id
       ORDER BY month_no
       ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
     )
     ROWS UNBOUNDED PRECEDING 
  )
FROM testschema.sales;

To demonstrate, we insert some test data into the table:

INSERT INTO testschema.sales VALUES (1001, 1, 35000.00);
INSERT INTO testschema.sales VALUES (1001, 2, 40000.00);
INSERT INTO testschema.sales VALUES (1001, 3, 45000.00);
INSERT INTO testschema.sales VALUES (1001, 4, 25000.00);
INSERT INTO testschema.sales VALUES (1001, 5, 30000.00);
INSERT INTO testschema.sales VALUES (1001, 6, 30000.00);
INSERT INTO testschema.sales VALUES (1001, 7, 50000.00);
INSERT INTO testschema.sales VALUES (1001, 8, 35000.00);
INSERT INTO testschema.sales VALUES (1001, 9, 60000.00);
INSERT INTO testschema.sales VALUES (1001, 10, 80000.00);
INSERT INTO testschema.sales VALUES (1001, 11, 90000.00);
INSERT INTO testschema.sales VALUES (1001, 12, 100000.00);

The sales amounts drop after months 3 and 7. The running total is reset accordingly at months 4 and 8.

SELECT * FROM testschema.running_total;

   store_id     month_no  sales_amount  cume_sales_amount
-----------  -----------  ------------  -----------------
       1001            1      35000.00           35000.00
       1001            2      40000.00           75000.00
       1001            3      45000.00          120000.00
       1001            4      25000.00           25000.00
       1001            5      30000.00           55000.00
       1001            6      30000.00           85000.00
       1001            7      50000.00          135000.00
       1001            8      35000.00           35000.00
       1001            9      60000.00           95000.00
       1001           10      80000.00          175000.00
       1001           11      90000.00          265000.00
       1001           12     100000.00          365000.00

AWS SCT converts the view as follows. The converted code uses a subquery to emulate the RESET WHEN. Essentially, a marker attribute is added to the result that flags a month over month sales drop. The flag is then used to determine the longest preceding run of increasing sales to aggregate.

CREATE OR REPLACE VIEW testschema.running_total (
  store_id
, month_no
, sales_amount
, cume_sales_amount) AS
SELECT
  store_id
, month_no
, sales_amount
, sum(sales_amount) OVER 
    (PARTITION BY k1, store_id ORDER BY month_no NULLS 
     FIRST ROWS UNBOUNDED      PRECEDING)
FROM (
  SELECT
   store_id
 , month_no
 , sales_amount
 , SUM(CASE WHEN k = 1 THEN 0 ELSE 1 END) OVER 
     (PARTITION BY store_id ORDER BY month_no NULLS 
       FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS k1
 FROM (
   SELECT
     store_id
   , month_no
   , sales_amount
   , CASE WHEN sales_amount < SUM(sales_amount) OVER 
      (PARTITION BY store_id ORDER BY month_no 
        ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) 
      OR sales_amount IS NULL THEN 0 ELSE 1 END AS k
   FROM testschema.sales
  )
);

We expect that RESET WHEN conversion will be a big hit with customers. You can try it now in AWS SCT.

TD_NORMALIZE_OVERLAP() function

The TD_NORMALIZE_OVERLAP function combines rows that have overlapping PERIOD values. The resulting normalized row contains the earliest starting bound and the latest ending bound from the PERIOD values of all the rows involved.

For example, we create a Teradata table that records employee salaries with the following code. Each row in the table is timestamped with the period that the employee was paid the given salary.

CREATE TABLE testschema.salaries (
  emp_id INTEGER
, salary DECIMAL(8,2)
, from_to PERIOD(DATE)
);

Now we add data for two employees. For emp_id = 1 and salary = 2000, there are two overlapping rows. Similarly, the two rows with emp_id = 2 and salary = 3000 are overlapping.

SELECT * FROM testschema.salaries ORDER BY emp_id, from_to;

     emp_id      salary  from_to
-----------  ----------  ------------------------
          1     1000.00  ('20/01/01', '20/05/31')
          1     2000.00  ('20/06/01', '21/02/28')
          1     2000.00  ('21/01/01', '21/06/30')
          2     3000.00  ('20/01/01', '20/03/31')
          2     3000.00  ('20/02/01', '20/04/30')

Now we create a view that uses the TD_NORMALIZE_OVERLAP function to normalize the overlapping data:

REPLACE VIEW testschema.normalize_salaries AS 
WITH sub_table(emp_id, salary, from_to) AS (
  SELECT 
    emp_id
  , salary
  , from_to
  FROM testschema.salaries
)
SELECT *
FROM 
  TABLE(TD_SYSFNLIB.TD_NORMALIZE_OVERLAP (NEW VARIANT_TYPE(sub_table.emp_id, sub_table.salary), sub_table.from_to)
    RETURNS (emp_id INTEGER, salary DECIMAL(8,2), from_to PERIOD(DATE))
    HASH BY emp_id
    LOCAL ORDER BY emp_id, salary, from_to
  ) AS DT(emp_id, salary, duration)
;

We can check that the view data is actually normalized:

select * from testschema.normalize_salaries order by emp_id, duration;

     emp_id      salary  duration
-----------  ----------  ------------------------
          1     1000.00  ('20/01/01', '20/05/31')
          1     2000.00  ('20/06/01', '21/06/30')
          2     3000.00  ('20/01/01', '20/04/30')

You can now use AWS SCT to convert any TD_NORMALIZE_OVERLAP statements. We first convert the salaries table to Amazon Redshift (see Accelerate your data warehouse migration to Amazon Redshift – Part 2 for details about period data type automation):

CREATE TABLE testschema.salaries (
  emp_id integer distkey
, salary numeric(8,2) ENCODE az64
, from_to_begin date ENCODE az64
, from_to_end date ENCODE az64    
)                                   
DISTSTYLE KEY                       
SORTKEY (emp_id);

# SELECT * FROM testschema.salaries ORDER BY emp_id, from_to_begin;
 emp_id | salary  | from_to_begin | from_to_end 
      1 | 1000.00 | 2020-01-01    | 2020-05-31
      1 | 2000.00 | 2020-06-01    | 2021-02-28
      1 | 2000.00 | 2021-01-01    | 2021-06-30
      2 | 3000.00 | 2020-01-01    | 2020-03-31
      2 | 3000.00 | 2020-02-01    | 2020-04-30

Now we use AWS SCT to convert the normalize_salaries view. AWS SCT adds a column that marks the start of a new group of rows. It then produces a single row for each group with a normalized timestamp.

CREATE VIEW testschema.normalize_salaries (emp_id, salary, from_to_begin, from_to_end) AS
WITH sub_table AS (
  SELECT
    emp_id
  , salary
  , from_to_begin AS start_date
  , from_to_end AS end_date
  , CASE
      WHEN start_date <= lag(end_date) OVER (PARTITION BY emp_id, salary ORDER BY start_date, end_date) THEN 0 
      ELSE 1
    END AS GroupStartFlag
    FROM testschema.salaries
  )
SELECT
  t2.emp_id
, t2.salary
, min(t2.start_date) AS from_to_begin
, max(t2.end_date) AS from_to_end
FROM (
  SELECT
    emp_id
  , salary
  , start_date
  , end_date
  , sum(GroupStartFlag) OVER (PARTITION BY emp_id, salary ORDER BY start_date ROWS UNBOUNDED PRECEDING) AS GroupID
  FROM 
    sub_table
) AS t2
GROUP BY 
  t2.emp_id
, t2.salary
, t2.GroupID;

We can check that the converted view returns the correctly normalized data:

# SELECT * FROM testschema.normalize_salaries ORDER BY emp_id;
 emp_id | salary  | from_to_begin | from_to_end 
      1 | 1000.00 | 2020-01-01    | 2020-05-31
      1 | 2000.00 | 2020-06-01    | 2021-06-30
      2 | 3000.00 | 2020-01-01    | 2020-04-30

You can try out TD_NORMALIZE_OVERLAP conversion in the latest release of AWS SCT. Download it now.

TD_UNPIVOT() function

The TD_UNPIVOT function transforms columns into rows. Essentially, we use it to take a row of similar metrics over different time periods and create a separate row for each metric.

For example, consider the following Teradata table. The table records customer visits by year and month for small kiosk stores:

CREATE TABLE TESTSCHEMA.kiosk_monthly_visits (
  kiosk_id INTEGER
, year_no INTEGER
, jan_visits INTEGER
, feb_visits INTEGER
, mar_visits INTEGER
, apr_visits INTEGER
, may_visits INTEGER
, jun_visits INTEGER
, jul_visits INTEGER
, aug_visits INTEGER
, sep_visits INTEGER
, oct_visits INTEGER
, nov_visits INTEGER
, dec_visits INTEGER)
PRIMARY INDEX (kiosk_id);

We insert some sample data into the table:

INSERT INTO testschema.kiosk_monthly_visits VALUES (100, 2020, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000, 2100, 2200);

Next, we create a view that unpivots the table so that the monthly visits appear on separate rows. The single row in the pivoted table creates 12 rows in the unpivoted table, one row per month.

REPLACE VIEW testschema.unpivot_kiosk_monthly_visits (
  kiosk_id
, year_no
, month_name
, month_visits
)
AS
SELECT 
  kiosk_id
, year_no
, month_name (FORMAT 'X(10)')
, month_visits
FROM TD_UNPIVOT (
 ON (SELECT * FROM testschema.kiosk_monthly_visits)
 USING
 VALUE_COLUMNS ('month_visits')
 UNPIVOT_COLUMN('month_name')
 COLUMN_LIST(
   'jan_visits'
 , 'feb_visits'
 , 'mar_visits'
 , 'apr_visits'
 , 'may_visits'
 , 'jun_visits'
 , 'jul_visits'
 , 'aug_visits'
 , 'sep_visits'
 , 'oct_visits'
 , 'nov_visits'
 , 'dec_visits'
 )
 COLUMN_ALIAS_LIST (
   'jan'
 , 'feb'
 , 'mar'
 , 'apr'
 , 'may'
 , 'jun'
 , 'jul'
 , 'aug'
 , 'sep'
 , 'oct'
 , 'nov'
 , 'dec'
 )
) a;

When you select from the view, the monthly sales are unpivoted into 12 separate rows:

SELECT * FROM testschema.unpivot_monthly_sales;

 id           yr           mon     mon_sales
----------- ----------- ---------- ----------
100         2021        jan           1100.00
100         2021        feb           1200.00
100         2021        mar           1300.00
100         2021        apr           1400.00
100         2021        may           1500.00
100         2021        jun           1600.00
100         2021        jul           1700.00
100         2021        aug           1800.00
100         2021        sep           1900.00
100         2021        oct           2000.00
100         2021        nov           2100.00
100         2021        dec           2200.00

Now we use AWS SCT to convert the view into ANSI SQL that can be run on Amazon Redshift. The conversion creates a common table expression (CTE) to place each month in a separate row. It then joins the CTE and the remaining attributes from the original pivoted table.

REPLACE VIEW testschema.unpivot_kiosk_monthly_visits (kiosk_id, year_no, month_name, month_visits) AS
WITH cols
AS (SELECT
    'jan' AS col
UNION ALL
SELECT
    'feb' AS col
UNION ALL
SELECT
    'mar' AS col
UNION ALL
SELECT
    'apr' AS col
UNION ALL
SELECT
    'may' AS col
UNION ALL
SELECT
    'jun' AS col
UNION ALL
SELECT
    'jul' AS col
UNION ALL
SELECT
    'aug' AS col
UNION ALL
SELECT
    'sep' AS col
UNION ALL
SELECT
    'oct' AS col
UNION ALL
SELECT
    'nov' AS col
UNION ALL
SELECT
    'dec' AS col)
SELECT
    t1.kiosk_id, t1.year_no, col AS "month_name",
    CASE col
        WHEN 'jan' THEN "jan_visits"
        WHEN 'feb' THEN "feb_visits"
        WHEN 'mar' THEN "mar_visits"
        WHEN 'apr' THEN "apr_visits"
        WHEN 'may' THEN "may_visits"
        WHEN 'jun' THEN "jun_visits"
        WHEN 'jul' THEN "jul_visits"
        WHEN 'aug' THEN "aug_visits"
        WHEN 'sep' THEN "sep_visits"
        WHEN 'oct' THEN "oct_visits"
        WHEN 'nov' THEN "nov_visits"
        WHEN 'dec' THEN "dec_visits"
        ELSE NULL
    END AS "month_visits"
    FROM testschema.kiosk_monthly_visits AS t1
    CROSS JOIN cols
    WHERE month_visits IS NOT NULL;

You can check that the converted view produces the same result as the Teradata version:

# SELECT * FROM testschema.unpivot_kiosk_monthly_visits;
 kiosk_id | year_no | month_name | month_visits 
      100 |    2020 | oct        |        2000
      100 |    2020 | nov        |        2100
      100 |    2020 | jul        |        1700
      100 |    2020 | feb        |        1200
      100 |    2020 | apr        |        1400
      100 |    2020 | aug        |        1800
      100 |    2020 | sep        |        1900
      100 |    2020 | jan        |        1100
      100 |    2020 | mar        |        1300
      100 |    2020 | may        |        1500
      100 |    2020 | jun        |        1600
      100 |    2020 | dec        |        2200

You can try out the conversion support for TD_UNPIVOT in the latest version of AWS SCT.

QUANTILE function

QUANTILE is a ranking function. It partitions the input set into a specified number of groups, each containing an equal portion of the total population. QUANTILE is a proprietary Teradata extension of the NTILE function found in ANSI SQL.

For example, we can compute the quartiles of the monthly visit data using the following Teradata view:

REPLACE VIEW testschema.monthly_visit_rank AS
SELECT
  kiosk_id
, year_no
, month_name
, month_visits
, QUANTILE(4, month_visits) qtile
FROM
 testschema.unpivot_kiosk_monthly_visits
;

When you select from the view, the QUANTILE function computes the quartile and applies it as an attribute on the output:

SELECT * FROM monthly_visit_rank;

   kiosk_id      year_no  month_name  month_visits        qtile
-----------  -----------  ----------  ------------  -----------
        100         2020  jan                 1100            0
        100         2020  feb                 1200            0
        100         2020  mar                 1300            0
        100         2020  apr                 1400            1
        100         2020  may                 1500            1
        100         2020  jun                 1600            1
        100         2020  jul                 1700            2
        100         2020  aug                 1800            2
        100         2020  sep                 1900            2
        100         2020  oct                 2000            3
        100         2020  nov                 2100            3
        100         2020  dec                 2200            3

Amazon Redshift supports a generalized NTILE function, which can implement QUANTILE, and is ANSI-compliant. We’ve enhanced AWS SCT to automatically convert QUANTILE function calls to equivalent NTILE function calls.

For example, when you convert the preceding Teradata view, AWS SCT produces the following Amazon Redshift code:

SELECT 
  unpivot_kiosk_monthly_visits.kiosk_id
, unpivot_kiosk_monthly_visits.year_no
, unpivot_kiosk_monthly_visits.month_name
, unpivot_kiosk_monthly_visits.month_visits
, ntile(4) OVER (ORDER BY unpivot_kiosk_monthly_visits.month_visits ASC  NULLS FIRST) - 1) AS qtile 
FROM 
  testschema.unpivot_kiosk_monthly_visits
;

QUANTILE conversion support is available now in AWS SCT.

QUALIFY filter

The QUALIFY clause in Teradata filters rows produced by an analytic function. Let’s look at an example. We use the following table, which contains store revenue by month. Our goal is to find the top five months by revenue:

CREATE TABLE testschema.sales (
  store_id INTEGER
, month_no INTEGER
, sales_amount DECIMAL(9,2))
PRIMARY INDEX (store_id);


SELECT * FROM sales;

   store_id     month_no  sales_amount
-----------  -----------  ------------
       1001            1      35000.00
       1001            2      40000.00
       1001            3      45000.00
       1001            4      25000.00
       1001            5      30000.00
       1001            6      30000.00
       1001            7      50000.00
       1001            8      35000.00
       1001            9      60000.00
       1001           10      80000.00
       1001           11      90000.00
       1001           12     100000.00

The data shows that July, September, October, November, and December were the top five sales months.

We create a view that uses the RANK function to rank each month by sales, then use the QUALIFY function to select the top five months:

REPLACE VIEW testschema.top_five_months(
  store_id
, month_no
, sales_amount
, month_rank
) as
SELECT
  store_id
, month_no
, sales_amount
, RANK() OVER (PARTITION BY store_id ORDER BY sales_amount DESC) month_rank
FROM
  testschema.sales
QUALIFY RANK() OVER (PARTITION by store_id ORDER BY sales_amount DESC) <= 5
;

Before, if you used the QUALIFY clause, you had to manually recode your SQL statements. Now, AWS SCT automatically converts QUALIFY into Amazon Redshift-compatible, ANSI-compliant SQL. For example, AWS SCT rewrites the preceding view as follows:

CREATE OR REPLACE VIEW testschema.top_five_months (
  store_id
, month_no
, sales_amount
, month_rank) AS
SELECT
  qualify_subquery.store_id
, qualify_subquery.month_no
, qualify_subquery.sales_amount
, month_rank
FROM (
  SELECT
    store_id
  , month_no
  , sales_amount
  , rank() OVER (PARTITION BY store_id ORDER BY sales_amount DESC NULLS FIRST) AS month_rank
  , rank() OVER (PARTITION BY store_id ORDER BY sales_amount DESC NULLS FIRST) AS qualify_expression_1
  FROM testschema.sales) AS qualify_subquery
  WHERE 
    qualify_expression_1 <= 5;

AWS SCT converts the original query into a subquery, and applies the QUALIFY expression as a filter on the subquery. AWS SCT adds an additional column to the subquery for the purpose of filtering. This is not strictly needed, but simplifies the code when column aliases aren’t used.

You can try QUALIFY conversion in the latest version of AWS SCT.

Summary

We’re happy to share these new features with you. If you’re contemplating a migration to Amazon Redshift, these capabilities can help automate your schema conversion and preserve your investment in existing reports and applications. If you’re looking to get started on a data warehouse migration, you can learn more about Amazon Redshift and AWS SCT from our public documentation.

This post described a few of the dozens of new features we’re introducing to automate your Teradata migrations to Amazon Redshift. We’ll share more in upcoming posts about automation for proprietary Teradata features and other exciting new capabilities.

Check back soon for more information. Until then, you can learn more about Amazon Redshift and the AWS Schema Conversion Tool. Happy migrating!


About the Authors

Michael Soo is a Senior Database Engineer with the AWS Database Migration Service team. He builds products and services that help customers migrate their database workloads to the AWS cloud.

Raza Hafeez is a Data Architect within the Lake House Global Specialty Practice of AWS Professional Services. He has over 10 years of professional experience building and optimizing enterprise data warehouses and is passionate about enabling customers to realize the power of their data. He specializes in migrating enterprise data warehouses to AWS Lake House Architecture.

Po Hong, PhD, is a Principal Data Architect of Lake House Global Specialty Practice, AWS Professional Services. He is passionate about supporting customers to adopt innovative solutions to reduce time to insight. Po is specialized in migrating large scale MPP on-premises data warehouses to the AWS Lake House architecture.

Entong Shen is a Software Development Manager of Amazon Redshift. He has been working on MPP databases for over 9 years and has focused on query optimization, statistics and migration related SQL language features such as stored procedures and data types.

Sumit Singh is a database engineer with Database Migration Service team at Amazon Web Services. He works closely with customers and provide technical assistance to migrate their on-premises workload to AWS cloud. He also assists in continuously improving the quality and functionality of AWS Data migration products.

Nelly Susanto is a Senior Database Migration Specialist of AWS Database Migration Accelerator. She has over 10 years of technical background focusing on migrating and replicating databases along with data-warehouse workloads. She is passionate about helping customers in their cloud journey.

Introducing Amazon MSK Connect – Stream Data to and from Your Apache Kafka Clusters Using Managed Connectors

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/

Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications. At re:Invent 2018, we announced Amazon Managed Streaming for Apache Kafka, a fully managed service that makes it easy to build and run applications that use Apache Kafka to process streaming data.

When you use Apache Kafka, you capture real-time data from sources such as IoT devices, database change events, and website clickstreams, and deliver it to destinations such as databases and persistent storage.

Kafka Connect is an open-source component of Apache Kafka that provides a framework for connecting with external systems such as databases, key-value stores, search indexes, and file systems. However, manually running Kafka Connect clusters requires you to plan and provision the required infrastructure, deal with cluster operations, and scale it in response to load changes.

Today, we’re announcing a new capability that makes it easier to manage Kafka Connect clusters. MSK Connect allows you to configure and deploy a connector using Kafka Connect with a just few clicks. MSK Connect provisions the required resources and sets up the cluster. It continuously monitors the health and delivery state of connectors, patches and manages the underlying hardware, and auto-scales connectors to match changes in throughput. As a result, you can focus your resources on building applications rather than managing infrastructure.

MSK Connect is fully compatible with Kafka Connect, which means you can migrate your existing connectors without code changes. You don’t need an MSK cluster to use MSK Connect. It supports Amazon MSK, Apache Kafka, and Apache Kafka compatible clusters as sources and sinks. These clusters can be self-managed or managed by AWS partners and 3rd parties as long as MSK Connect can privately connect to the clusters.

Using MSK Connect with Amazon Aurora and Debezium
To test MSK Connect, I want to use it to stream data change events from one of my databases. To do so, I use Debezium, an open-source distributed platform for change data capture built on top of Apache Kafka.

I use a MySQL-compatible Amazon Aurora database as the source and the Debezium MySQL connector with the setup described in this architectural diagram:

Architectural diagram.

To use my Aurora database with Debezium, I need to turn on binary logging in the DB cluster parameter group. I follow the steps in the How do I turn on binary logging for my Amazon Aurora MySQL cluster article.

Next, I have to create a custom plugin for MSK Connect. A custom plugin is a set of JAR files that contain the implementation of one or more connectors, transforms, or converters. Amazon MSK will install the plugin on the workers of the connect cluster where the connector is running.

From the Debezium website, I download the MySQL connector plugin for the latest stable release. Because MSK Connect accepts custom plugins in ZIP or JAR format, I convert the downloaded archive to ZIP format and keep the JARs files in the main directory:

$ tar xzf debezium-connector-mysql-1.6.1.Final-plugin.tar.gz
$ cd debezium-connector-mysql
$ zip -9 ../debezium-connector-mysql-1.6.1.zip *
$ cd ..

Then, I use the AWS Command Line Interface (CLI) to upload the custom plugin to an Amazon Simple Storage Service (Amazon S3) bucket in the same AWS Region I am using for MSK Connect:

$ aws s3 cp debezium-connector-mysql-1.6.1.zip s3://my-bucket/path/

On the Amazon MSK console there is a new MSK Connect section. I look at the connectors and choose Create connector. Then, I create a custom plugin and browse my S3 buckets to select the custom plugin ZIP file I uploaded before.

Console screenshot.

I enter a name and a description for the plugin and then choose Next.

Console screenshot.

Now that the configuration of the custom plugin is complete, I start the creation of the connector. I enter a name and a description for the connector.

Console screenshot.

I have the option to use a self-managed Apache Kafka cluster or one that is managed by MSK. I select one of my MSK cluster that is configured to use IAM authentication. The MSK cluster I select is in the same virtual private cloud (VPC) as my Aurora database. To connect, the MSK cluster and Aurora database use the default security group for the VPC. For simplicity, I use a cluster configuration with auto.create.topics.enable set to true.

Console screenshot.

In Connector configuration, I use the following settings:

connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=<aurora-database-writer-instance-endpoint>
database.port=3306
database.user=my-database-user
database.password=my-secret-password
database.server.id=123456
database.server.name=ecommerce-server
database.include.list=ecommerce
database.history.kafka.topic=dbhistory.ecommerce
database.history.kafka.bootstrap.servers=<bootstrap servers>
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
include.schema.changes=true

Some of these settings are generic and should be specified for any connector. For example:

  • connector.class is the Java class of the connector.
  • tasks.max is the maximum number of tasks that should be created for this connector.

Other settings are specific to the Debezium MySQL connector:

  • The database.hostname contains the writer instance endpoint of my Aurora database.
  • The database.server.name is a logical name of the database server. It is used for the names of the Kafka topics created by Debezium.
  • The database.include.list contains the list of databases hosted by the specified server.
  • The database.history.kafka.topic is a Kafka topic used internally by Debezium to track database schema changes.
  • The database.history.kafka.bootstrap.servers contains the bootstrap servers of the MSK cluster.
  • The final eight lines (database.history.consumer.* and database.history.producer.*) enable IAM authentication to access the database history topic.

In Connector capacity, I can choose between autoscaled or provisioned capacity. For this setup, I choose Autoscaled and leave all other settings at their defaults.

Console screenshot.

With autoscaled capacity, I can configure these parameters:

  • MSK Connect Unit (MCU) count per worker – Each MCU provides 1 vCPU of compute and 4 GB of memory.
  • The minimum and maximum number of workers.
  • Autoscaling utilization thresholds – The upper and lower target utilization thresholds on MCU consumption in percentage to trigger auto scaling.

Console screenshot.

There is a summary of the minimum and maximum MCUs, memory, and network bandwidth for the connector.

Console screenshot.

For Worker configuration, you can use the default one provided by Amazon MSK or provide your own configuration. In my setup, I use the default one.

In Access permissions, I create a IAM role. In the trusted entities, I add kafkaconnect.amazonaws.com to allow MSK Connect to assume the role.

The role is used by MSK Connect to interact with the MSK cluster and other AWS services. For my setup, I add:

The Debezium connector needs access to the cluster configuration to find the replication factor to use to create the history topic. For this reason, I add to the permissions policy the kafka-cluster:DescribeClusterDynamicConfiguration action (equivalent Apache Kafka’s DESCRIBE_CONFIGS cluster ACL).

Depending on your configuration, you might need to add more permissions to the role (for example, in case the connector needs access to other AWS resources such as an S3 bucket). If that is the case, you should add permissions before creating the connector.

In Security, the settings for authentication and encryption in transit are taken from the MSK cluster.

Console screenshot.

In Logs, I choose to deliver logs to CloudWatch Logs to have more information on the execution of the connector. By using CloudWatch Logs, I can easily manage retention and interactively search and analyze my log data with CloudWatch Logs Insights. I enter the log group ARN (it’s the same log group I used before in the IAM role) and then choose Next.

Console screenshot.

I review the settings and then choose Create connector. After a few minutes, the connector is running.

Testing MSK Connect with Amazon Aurora and Debezium
Now let’s test the architecture I just set up. I start an Amazon Elastic Compute Cloud (Amazon EC2) instance to update the database and start a couple of Kafka consumers to see Debezium in action. To be able to connect to both the MSK cluster and the Aurora database, I use the same VPC and assign the default security group. I also add another security group that gives me SSH access to the instance.

I download a binary distribution of Apache Kafka and extract the archive in the home directory:

$ tar xvf kafka_2.13-2.7.1.tgz

To use IAM to authenticate with the MSK cluster, I follow the instructions in the Amazon MSK Developer Guide to configure clients for IAM access control. I download the latest stable release of the Amazon MSK Library for IAM:

$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/1.1.0/aws-msk-iam-auth-1.1.0-all.jar

In the ~/kafka_2.13-2.7.1/config/ directory I create a client-config.properties file to configure a Kafka client to use IAM authentication:

# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL

# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM

# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

I add a few lines to my Bash profile to:

  • Add Kafka binaries to the PATH.
  • Add the MSK Library for IAM to the CLASSPATH.
  • Create the BOOTSTRAP_SERVERS environment variable to store the bootstrap servers of my MSK cluster.
$ cat >> ~./bash_profile
export PATH=~/kafka_2.13-2.7.1/bin:$PATH
export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.1.0-all.jar
export BOOTSTRAP_SERVERS=<bootstrap servers>

Then, I open three terminal connections to the instance.

In the first terminal connection, I start a Kafka consumer for a topic with the same name as the database server (ecommerce-server). This topic is used by Debezium to stream schema changes (for example, when a new table is created).

$ cd ~/kafka_2.13-2.7.1/
$ kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS \
                            --consumer.config config/client-config.properties \
                            --topic ecommerce-server --from-beginning

In the second terminal connection, I start another Kafka consumer for a topic with a name built by concatenating the database server (ecommerce-server), the database (ecommerce), and the table (orders). This topic is used by Debezium to stream data changes for the table (for example, when a new record is inserted).

$ cd ~/kafka_2.13-2.7.1/
$ kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVERS \
                            --consumer.config config/client-config.properties \
                            --topic ecommerce-server.ecommerce.orders --from-beginning

In the third terminal connection, I install a MySQL client using the MariaDB package and connect to the Aurora database:

$ sudo yum install mariadb
$ mysql -h <aurora-database-writer-instance-endpoint> -u <database-user> -p

From this connection, I create the ecommerce database and a table for my orders:

CREATE DATABASE ecommerce;

USE ecommerce

CREATE TABLE orders (
       order_id VARCHAR(255),
       customer_id VARCHAR(255),
       item_description VARCHAR(255),
       price DECIMAL(6,2),
       order_date DATETIME DEFAULT CURRENT_TIMESTAMP
);

These database changes are captured by the Debezium connector managed by MSK Connect and are streamed to the MSK cluster. In the first terminal, consuming the topic with schema changes, I see the information on the creation of database and table:

Struct{source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202831473,db=ecommerce,server_id=1980402433,file=mysql-bin-changelog.000003,pos=9828,row=0},databaseName=ecommerce,ddl=CREATE DATABASE ecommerce,tableChanges=[]}
Struct{source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202878811,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=10002,row=0},databaseName=ecommerce,ddl=CREATE TABLE orders ( order_id VARCHAR(255), customer_id VARCHAR(255), item_description VARCHAR(255), price DECIMAL(6,2), order_date DATETIME DEFAULT CURRENT_TIMESTAMP ),tableChanges=[Struct{type=CREATE,id="ecommerce"."orders",table=Struct{defaultCharsetName=latin1,primaryKeyColumnNames=[],columns=[Struct{name=order_id,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=latin1,length=255,position=1,optional=true,autoIncremented=false,generated=false}, Struct{name=customer_id,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=latin1,length=255,position=2,optional=true,autoIncremented=false,generated=false}, Struct{name=item_description,jdbcType=12,typeName=VARCHAR,typeExpression=VARCHAR,charsetName=latin1,length=255,position=3,optional=true,autoIncremented=false,generated=false}, Struct{name=price,jdbcType=3,typeName=DECIMAL,typeExpression=DECIMAL,length=6,scale=2,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=order_date,jdbcType=93,typeName=DATETIME,typeExpression=DATETIME,position=5,optional=true,autoIncremented=false,generated=false}]}}]}

Then, I go back to the database connection in the third terminal to insert a few records in the orders table:

INSERT INTO orders VALUES ("123456", "123", "A super noisy mechanical keyboard", "50.00", "2021-08-16 10:11:12");
INSERT INTO orders VALUES ("123457", "123", "An extremely wide monitor", "500.00", "2021-08-16 11:12:13");
INSERT INTO orders VALUES ("123458", "123", "A too sensible microphone", "150.00", "2021-08-16 12:13:14");

In the second terminal, I see the information on the records inserted into the orders table:

Struct{after=Struct{order_id=123456,customer_id=123,item_description=A super noisy mechanical keyboard,price=50.00,order_date=1629108672000},source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202993000,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=10464,row=0},op=c,ts_ms=1629202993614}
Struct{after=Struct{order_id=123457,customer_id=123,item_description=An extremely wide monitor,price=500.00,order_date=1629112333000},source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202993000,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=10793,row=0},op=c,ts_ms=1629202993621}
Struct{after=Struct{order_id=123458,customer_id=123,item_description=A too sensible microphone,price=150.00,order_date=1629115994000},source=Struct{version=1.6.1.Final,connector=mysql,name=ecommerce-server,ts_ms=1629202993000,db=ecommerce,table=orders,server_id=1980402433,file=mysql-bin-changelog.000003,pos=11114,row=0},op=c,ts_ms=1629202993630}

My change data capture architecture is up and running and the connector is fully managed by MSK Connect.

Availability and Pricing
MSK Connect is available in the following AWS Regions: Asia Pacific (Mumbai), Asia Pacific (Seoul), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Canada (Central), EU (Frankfurt), EU (Ireland), EU (London), EU (Paris), EU (Stockholm), South America (Sao Paulo), US East (N. Virginia), US East (Ohio), US West (N. California), US West (Oregon). For more information, see the AWS Regional Services List.

With MSK Connect you pay for what you use. The resources used by your connectors can be scaled automatically based on your workload. For more information, see the Amazon MSK pricing page.

Simplify the management of your Apache Kafka connectors today with MSK Connect.

Danilo

Discovering what’s slowing down your website with Web Analytics

Post Syndicated from Joao Sousa Botto original https://blog.cloudflare.com/web-analytics-vitals-explorer/

Discovering what’s slowing down your website with Web Analytics

Discovering what’s slowing down your website with Web Analytics

Web Analytics is Cloudflare’s privacy-focused real user measurement solution. It leverages a lightweight JavaScript beacon and does not use any client-side state, such as cookies or localStorage, to collect usage metrics. Nor does it “fingerprint” individuals via their IP address, User Agent string, or any other data.

Cloudflare Web Analytics makes essential web analytics, such as the top-performing pages on your website and top referrers, available to everyone for free, and it’s becoming more powerful than ever.

Focusing on Performance

Earlier this year we merged Web Analytics with our Browser Insights product, which enabled customers proxying their websites through Cloudflare to evaluate visitors’ experience on their web properties through Core Web Vitals such as Largest Contentful Paint (LCP) and First Input Delay (FID).

It was important to bring the Core Web Vitals performance measurements into Web Analytics given the outsized impact that page load times have on bounce rates. A page load time increase from 1s to 3s increases bounce rates by 32% and from 1s to 6s increases it by 106% (source).

Now that you know the impact a slow-loading web page can have on your visitors, it’s time for us to make it a no-brainer to take action. Read on.

Becoming Action-Oriented

We believe that, to deliver the most value to our users, the product should facilitate the following process:

  1. Measure the real user experience
  2. Grade this experience — is it satisfactory or in need of improvement?
  3. Provide actionable insights — what part of the web page should be tweaked to improve the user experience?
  4. Repeat
Discovering what’s slowing down your website with Web Analytics

And it all starts with Web Analytics Vitals Explorer, which started rolling out today.

Introducing Web Analytics Vitals Explorer

Vitals Explorer enables you to easily pinpoint which elements on your pages are affecting users the most, with accurate measurements from the visitors perspective and an easy-to-read impact grading.

To do that, we have automatically updated the Web Analytics JavaScript beacon so that it collects the relevant vital measurements from the browser. As always, we are not collecting any information that would invade your visitors’ privacy.

Usage

Once this new beacon is updated on your sites — and again the update will happen transparently to you — you can then navigate to the Core Web Vitals page on Web Analytics. When entering that page, you will see three graphs grading the user experience for Largest Contentful Paint (LCP), First Input Delay (FID), and Cumulative Layout Shift (CLS). Below each graph you can see the debug section with the top five elements with a negative impact on the metric. Lastly, when clicking on either of these elements shown in the data table, you will be presented with its impact and exact paths so that you can easily decide whether this is worth keeping on your website in its current format.

Discovering what’s slowing down your website with Web Analytics

In addition to this new Core Web Vitals content, we have also added First Paint and First Contentful Paint to the Page Load Time page. When you navigate to this page you will now see the page load summary and a graph representing page load timing. These will allow you to quickly identify any regressions to these important performance metrics.

Discovering what’s slowing down your website with Web Analytics

Measurement details

This additional debugging information for Core Web Vitals is measured during the lifespan of the page (until the user leaves the tab or closes the browser window, which updates visibilityState to a hidden state).

Here’s what we collect:

Common for all Core Web Vitals

  • Element is a CSS selector representing the DOM node. With this string, the developer can use `document.querySelector(<element_name>)` in their browser’s dev console to find out which DOM node has a negative impact on your scores/values.
  • Path is the URL path at the time the Core Web Vitals are captured.
  • Value is the metric value for each Core Web Vitals. This value is in milliseconds for LCP or FID and a score for CLS (Cumulative Layout Shift).

Largest Contentful Paint

  • URL is the source URL (such as image, text, web fonts).
  • Size is the source object’s size in bytes.

First Input Delay

  • Name is the type of event (such as mousedown, keydown, pointerdown).

Cumulative Layout Shift

Layout information is a JSON value that includes width, height, x axis position, y axis position, left, right, top, and bottom. You are able to observe layout shifts that happen on the page by observing these values.

  • CurrentRect is the largest source element’s layout information after the shift. This JSON value is shown as Current under Layout Shifts section in the Web Analytics UI.
  • PreviousRect is the largest source element’s layout information before the shift. This JSON value is shown as Previous under Layout Shifts section in the Web Analytics UI.

Paint Timings

Additionally, we have added two important paint timings

  • First Paint is the time between navigation and when the browser renders the first pixels to the screen.
  • First Contentful Paint is the time when the browser renders the first bit of content from the DOM.

A lot of this is based on standard browser measurements, which you can read about in detail on this blog post from Google.

Moving forward

And we are by no means done. Moving forward, we will bring this structured approach with grading and actionable insights into as Web Analytics measurements as possible, and keep guiding you through how to improve your visitors’ experience. So stay tuned.
And in the meantime, do let us know what you think about this feature and ask questions on the community forums.

How we built Instant Logs

Post Syndicated from Ben Yule original https://blog.cloudflare.com/how-we-built-instant-logs/

How we built Instant Logs

How we built Instant Logs

As a developer, you may be all too familiar with the stress of responding to a major service outage, becoming aware of an ongoing security breach, or simply dealing with the frustration of setting up a new service for the first time. When confronted with these situations, you want a real-time view into the events flowing through your network, so you can receive, process, and act on information as quickly as possible.

If you have a UNIX mindset, you’ll be familiar with tailing web service logs and searching for patterns using grep. With distributed systems like Cloudflare’s edge network, this task becomes much more complex because you’ll either need to log in to thousands of servers, or ship all the logs to a single place.

This is why we built Instant Logs. Instant Logs removes all barriers to accessing your Cloudflare logs, giving you a complete platform to view your HTTP logs in real time, with just a single click, right from within Cloudflare’s dashboard. Powerful filters then let you drill into specific events or search for patterns, and act on them instantly.

The Challenge

Today, Cloudflare’s Logpush product already gives customers the ability to ship their logs to a third-party analytics or storage provider of their choosing. While this system is already exceptionally fast, delivering logs in about 15s on average, it is optimized for completeness and the utmost certainty that your data is reliably making it to its destination. It is the ideal solution for after things have settled down, and you want to perform a forensic deep dive or retrospective.

We originally aimed to extend this system to provide our real-time logging capabilities, but we soon realized the objectives were inherently at odds with each other. In order to get all of your data, to a single place, all the time, the laws of the universe require that latencies be introduced into the system. We needed a complementary solution, with its own unique set of objectives.

This ultimately boiled down to the following

  1. It has to be extremely fast, in human terms. This means average latencies between an event occurring at the edge and being received by the client should be under three seconds.
  2. We wanted the system design to be simple, and communication to be as direct to the client as possible. This meant operating the dataplane entirely at the edge, eliminating unnecessary round trips to a core data center.
  3. The pipeline needs to provide sensible results on properties of all sizes, ranging from a few requests per day to hundreds of thousands of requests per second.
  4. The pipeline must support a broad set of user-definable filters that are applied before any sampling occurs, such that a user can target and receive exactly what they want.

Workers and Durable Objects

Our existing Logpush pipeline relies heavily on Kafka to provide sharding, buffering, and aggregation at a single, central location. While we’ve had excellent results using Kafka for these pipelines, the clusters are optimized to run only within our core data centers. Using Kafka would require extra hops to far away data centers, adding a latency penalty we were not willing to incur.

In order to keep the data plane running on the edge, we needed primitives that would allow us to perform some of the same key functions we needed out of Kafka. This is where Workers and the recently released Durable Objects come in. Workers provide an incredibly simple to use, highly elastic, edge-native, compute platform we can use to receive events, and perform transformations. Durable Objects, through their global uniqueness, allow us to coordinate messages streaming from thousands of servers and route them to a singular object. This is where aggregation and buffering are performed, before finally pushing to a client over a thin WebSocket. We get all of this, without ever having to leave the edge!

Let’s walk through what this looks like in practice.

A Simple Start

Imagine a simple scenario in which we have a single web server which produces log messages, and a single client which wants to consume them. This can be implemented by creating a Durable Object, which we will refer to as a Durable Session, that serves as the point of coordination between the server and client. In this case, the client initiates a WebSocket connection with the Durable Object, and the server sends messages to the Durable Object over HTTP, which are then forwarded directly to the client.

How we built Instant Logs

This model is quite quick and introduces very little additional latency other than what would be required to send a payload directly from the web server to the client. This is thanks to the fact that Durable Objects are generally located at or near the data center where they are first requested. At least in human terms, it’s instant. Adding more servers to our model is also trivial. As the additional servers produce events, they will all be routed to the same Durable Object, which merges them into a single stream, and sends them to the client over the same WebSocket.

How we built Instant Logs

Durable Objects are inherently single threaded. As the number of servers in our simple example increases, the Durable Object will eventually saturate its CPU time and will eventually start to reject incoming requests. And even if it didn’t, as data volumes increase, we risk overwhelming a client’s ability to download and render log lines. We’ll handle this in a few different ways.

Honing in on specific events

Filtering is the most simple and obvious way to reduce data volume before it reaches the client. If we can filter out the noise, and stream only the events of interest, we can substantially reduce volume. Performing this transformation in the Durable Object itself will provide no relief from CPU saturation concerns. Instead, we can push this filtering out to an invoking Worker, which will run many filter operations in parallel, as it elastically scales to process all the incoming requests to the Durable Object. At this point, our architecture starts to look a lot like the MapReduce pattern!

How we built Instant Logs

Scaling up with shards

Ok, so filtering may be great in some situations, but it’s not going to save us under all scenarios. We still need a solution to help us coordinate between potentially thousands of servers that are sending events every single second. Durable Objects will come to the rescue, yet again. We can implement a sharding layer consisting of Durable Objects, we will call them Durable Shards, that effectively allow us to reduce the number of requests being sent to our primary object.

How we built Instant Logs

But how do we implement this layer if Durable Objects are globally unique? We first need to decide on a shard key, which is used to determine which Durable Object a given message should first be routed to. When the Worker processes a message, the key will be added to the name of the downstream Durable Object. Assuming our keys are well-balanced, this should effectively reduce the load on the primary Durable Object by approximately 1/N.

Reaching the moon by sampling

But wait, there’s more to do. Going back to our original product requirements, “The pipeline needs to provide sensible results on properties of all sizes, ranging from a few requests per day to hundreds of thousands of requests per second.” With the system as designed so far, we have the technical headroom to process an almost arbitrary number of logs. However, we’ve done nothing to reduce the absolute volume of messages that need to be processed and sent to the client, and at high log volumes, clients would quickly be overwhelmed. To deliver the interactive, instant user experience customers expect, we need to roll up our sleeves one more time.

This is where our final trick, sampling, comes into play.

Up to this point, when our pipeline saturates, it still makes forward progress by dropping excess data as the Durable Object starts to refuse connections. However, this form of ‘uncontrolled shedding’ is dangerous because it causes us to lose information. When we drop data in this way, we can’t keep a record of the data we dropped, and we cannot infer things about the original shape of the traffic from the messages that we do receive. Instead, we implement a form of ‘controlled’ sampling, which still preserves the statistics, and information about the original traffic.

For Instant Logs, we implement a sampling technique called Reservoir Sampling. Reservoir sampling is a form of dynamic sampling that has this amazing property of letting us pick a specific k number of items from a stream of unknown length n, with a single pass through the data. By buffering data in the reservoir, and flushing it on a short (sub second) time interval, we can output random samples to the client at the maximum data rate of our choosing. Sampling is implemented in both layers of Durable Objects.

How we built Instant Logs

Information about the original traffic shape is preserved by assigning a sample interval to each line, which is equivalent to the number of samples that were dropped for this given sample to make it through, or 1/probability. The actual number of requests can then be calculated by taking the sum of all sample intervals within a time window. This technique adds a slight amount of latency to the pipeline to account for buffering, but enables us to point an event source of nearly any size at the pipeline, and we can expect it will be handled in a sensible, controlled way.

Putting it all together

What we are left with is a pipeline that sensibly handles wildly different volumes of traffic, from single digits to hundreds of thousands of requests a second. It allows the user to pinpoint an exact event in a sea of millions, or calculate summaries over every single one. It delivers insight within seconds, all without ever having to do more than click a button.

Best of all? Workers and Durable Objects handle this workload with aplomb and no tuning, and the available developer tooling allowed me to be productive from my first day writing code targeting the Workers ecosystem.

How to get involved?

We’ll be starting our Beta for Instant Logs in a couple of weeks. Join the waitlist to get notified about when you can get access!

If you want to be part of building the future of data at Cloudflare, we’re hiring engineers for our data team in Lisbon, London, Austin, and San Francisco!

Data at Cloudflare just got a lot faster: Announcing Live-updating Analytics and Instant Logs

Post Syndicated from Jon Levine original https://blog.cloudflare.com/instant-logs/

Data at Cloudflare just got a lot faster: Announcing Live-updating Analytics and Instant Logs

Data at Cloudflare just got a lot faster: Announcing Live-updating Analytics and Instant Logs

Today, we’re excited to introduce Live-updating Analytics and Instant Logs. For Pro, Business, and Enterprise customers, our analytics dashboards now update live to show you data as it arrives. In addition to this, Enterprise customers can now view their HTTP request logs instantly in the Cloudflare dashboard.

Cloudflare’s data products are essential for our customers’ visibility into their network and applications. Having this data in real time makes it even more powerful — could you imagine trying to navigate using a GPS that showed your location a minute ago? That’s the power of real time data!

Real time data unlocks entirely new use cases for our customers. They can respond to threats and resolve errors as soon as possible, keeping their applications secure and minimising disruption to their end users.

Lightning fast, in-depth analytics

Cloudflare products generate petabytes of log data daily and are designed for scale. To make sense of all this data, we summarize it using analytics — the ability to see time series data, tops Ns, and slices and dices of the data generated by Cloudflare products. This allows customers to identify trends and anomalies and drill deep into problems.

We take it a step further from just showing you high-level metrics. With Cloudflare Analytics you have the ability to quickly drill down into the most important data — narrow in on a specific time period and add a chain of filters to slice your data further and see all the reflecting analytics.

Data at Cloudflare just got a lot faster: Announcing Live-updating Analytics and Instant Logs
Video of Cloudflare analytics showing live updating and drill down capabilities

Let’s say you’re a developer who’s made some recent changes to your website, you’ve deleted some old content and created new web pages. You want to know as soon as possible if these changes have led to any broken links, so you can quickly identify them and make fixes. With live-updating analytics, you can monitor your traffic by status code. If you notice an uptick in 404 errors add a filter to get details on all 404s and view the top referrers causing the errors. From there, take steps to resolve the problem whether by creating a redirect page rule or fixing broken links on your own site.

Instant Logs at your fingertips

While Analytics are a great way to see data at an aggregate level, sometimes you need event level information, too. Logs are powerful because they record every single event that flows through a network, so you can figure out what occurred on a granular level.

Our Logpush system is already able to get logs from our global edge network to a customer’s storage destination or analytics provider within seconds. However, setting this up has a lot of overhead and often customers incur long processing times at their destination. We wanted logs to be instant — instant to set up, deliver and take action on.

It’s that easy.

With Instant Logs, customers can actively monitor the traffic that’s flowing through their network and make key decisions that affect their applications now. Real time data unlocks totally new use cases:

  • For Security Engineers: Stop an attack as it’s developing. For example, apply a Firewall rule and see it’s impact — get answers within seconds. If it’s not what you were intending, try another rule and check again.
  • For Developers: Roll out a config change — to Cloudflare, or to your origin — and have piece of mind to watch as your error rates stay flat (we hope!).

(By the way, if you’re a fan of Workers and want to see real time Workers logging, check out the recently released dashboard for Workers logs.)

Logs at the speed of sight

“Real time” or “instant” can mean different things to different people in different contexts. At Cloudflare, we’re striving to make it as close to the speed of sight as possible. For us, this means we wanted the “glass-to-glass” time — from when you hit “enter” in your browser until when the logs appear — to be under one second.

How did we do?

Today, Cloudflare’s Instant Logs have an average delay of two seconds, and we’re continuing to make improvements to drive that down.

“Real-time” is a very fuzzy term. Looking at other services we see Akamai talking about real-time data as “within minutes” or “latency of 10 minutes”, Amazon talks about “near real-time” for CloudWatch, Google Cloud Logging provides log tailing with a configurable buffer “up to 60 seconds” to deal with potential out-of-order log delivery, and we benchmarked Fastly logs at 25 seconds.

Our goal is to drive down the delay as much as possible (within the laws of physics). We’re happy to have shipped Instant Logs that arrive in two seconds, but we’re not satisfied and will continue to bring that number down.

In time sensitive scenarios such as an attack or an outage, a few minutes or even 30 seconds of delay can have a big impact on customers. At Cloudflare, our goal is to get our customer’s data into their hands as fast as possible  — and we’re just getting started.

How to get access?

Live-updating Analytics is available now on all Pro, Business, and Enterprise plans. Select the “Last 30 minutes” view of your traffic in the Analytics tab to start monitoring your analytics live.

We’ll be starting our Beta for Instant Logs in a couple of weeks. Join the waitlist to get notified about when you can get access!

If you’re eager for details on the inner workings of Instant Logs, check out our blog post about how we built Instant Logs.

What’s next

We’re hard at work to make Instant Logs available for all Enterprise customers — stay tuned after joining our waitlist. We’re also planning to bring all of our datasets to Instant Logs, including Firewall Events. In addition, we’re working on the next set of features like the ability to download logs from your session and compute running aggregates from logs.

For a peek into what we have our sights on next, we know how important it is to perform analysis on not only up-to-date data, but also historical data. We want to give customers the ability to analyze logs, draw insights and perform forensics straight from the Cloudflare platform.

If this sounds cool, we’re hiring engineers for our data team in Lisbon, London and San Francisco — would love to have you help us build the future of data at Cloudflare.

Analyze daily trading activity using transaction data from Amazon Redshift in Amazon FinSpace

Post Syndicated from Mariia Berezina original https://aws.amazon.com/blogs/big-data/analyze-daily-trading-activity-using-transaction-data-from-amazon-redshift-in-amazon-finspace/

Financial services organizations use data from various sources to discover new insights and improve trading decisions. Finding the right dataset and getting access to the data can frequently be a time-consuming process. For example, to analyze daily trading activity, analysts need to find a list of available databases and tables, identify its owner’s contact information, get access, understand the table schema, and load the data. They repeat this process for every additional dataset needed for the analysis.

Amazon FinSpace makes it easy for analysts and quants to discover, analyze, and share data, reducing the time it takes to find and access financial data from months to minutes. To get started, FinSpace admins create a category and an attribute set to capture relevant external reference information such as database type and table name. After connecting to data source or uploading it directly through the FinSpace user interface (UI), you can create datasets in FinSpace that include schema and other relevant information. Analysts can then search the catalog for necessary datasets and connect to them using the FinSpace web interface or through the FinSpace JupyterLab notebook.

Amazon Redshift is a popular choice for storing and querying exabytes of structured and semi-structured data such as trade transactions. In this post, we explore how to connect to an Amazon Redshift data warehouse from FinSpace through a Spark SQL JDBC connection and populate the FinSpace catalog with metadata such as schema details, dataset owner, and description. We then show how simple it is to use the FinSpace catalog to discover available data and to connect to an Amazon Redshift cluster from a Jupyter notebook in FinSpace to read daily trades for Amazon (AMZN) stock. Finally, we will evaluate how well-executed were our stock purchases. We will do it by comparing our transactions stored in Amazon Redshift to trading history for the stock stored in FinSpace.

Solution overview

The blog post covers the following steps:

  1. Configure your FinSpace catalog to describe your Amazon Redshift tables.
  2. Use FinSpace notebooks to connect to Amazon Redshift.
  3. Populate the FinSpace catalog with tables from Amazon Redshift. Add description, owner, and attributes to each dataset to help with data discovery and access control.
  4. Search the FinSpace catalog for data.
  5. Use FinSpace notebooks to analyze data from both FinSpace and Amazon Redshift to evaluate trade performance based on the daily price for AMZN stock.

The diagram below provides the complete solution overview.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • Download Jupyter notebooks covering Amazon Redshift dataset import and analysis. Import them into FinSpace by cloning the GitHub repo or by dropping them into FinSpace. The code provided in this blog post should be run from the FinSpace notebooks.
  • Setup a FinSpace environment. For instructions on creating a new environment, see Create an Amazon FinSpace Environment.
  • Install Capital Markets sample data bundle, as explained in the “Sample Data Bundle” guide.
  • Ensure you have permissions to manage categories and controlled vocabularies and manage attribute sets in FinSpace.
  • Create an Amazon Redshift cluster in the same AWS account as the FinSpace environment. For instructions, see Create a cluster. Additionally, create a superuser and ensure that the cluster is publicly accessible.
  • Create a table in Amazon Redshift and insert trading transaction data using these SQL queries.

Configure your FinSpace catalog to describe your Amazon Redshift tables

FinSpace users can discover relevant datasets by using search or by navigating across categories under the Browse Data menu. Categories allow for cataloging of datasets by commonly used business terms (such as source, data class, type, industry, and so on). An attribute set holds additional metadata for each dataset, including categories and table details to enable you to connect to the data source directly from a FinSpace notebook. Analysts can browse and search attributes to find datasets based on the values assigned to them.

Complete the following steps to create a new subcategory called Redshift under the Source category, and create an attribute set called Redshift Table Attributes. In the following section, we use the subcategory and attribute set to tag datasets from Amazon Redshift. FinSpace users can then browse for the data from the Amazon Redshift source from the Browse Data menu and filter datasets in FinSpace for the tables that are located in the company’s Amazon Redshift data warehouse.

  1. On the FinSpace console, choose Settings (gear icon).
  2. Choose Categories.
  3. Hover over the Source category and choose Edit this Category.

  4. On the Edit Category page, hover over the Source category again and choose Add Sub-Category.
  5. Add Redshift as a source subcategory and Financial data from company's Amazon Redshift data warehouse as the description.

Next, create an attribute set called Redshift Table Attributes to capture additional business context for each dataset.

  1. On the FinSpace console, choose Settings (gear icon).
  2. Choose Attribute Sets.
  3. Choose CREATE ATTRIBUTE SET.
  4. Create a new attribute set called Redshift Table Attributes.
  5. Add the following fields:
    1. Catalog – Data String type
    2. Schema – Data String type
    3. Table – Data String type
    4. Source – Categorization Source type

Use FinSpace notebooks to connect to Amazon Redshift

The notebooks downloaded as part of the prerequisite provide the integration between FinSpace and Amazon Redshift. The steps below explain the code so you can run and extend as needed.
  1. Connect to the Spark cluster by running the following code:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
    finspace_clusters = FinSpaceClusterManager()
    finspace_clusters.auto_connect()
else:
    print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

After the connection is established, you see a connected to cluster message. It may take 5–8 minutes for the cluster connection to establish.

  1. Add the JDBC driver to Spark jars by running the following code:
%%configure -f
{ "conf":{
          "spark.jars": "https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/2.0.0.7/redshift-jdbc42-2.0.0.7.jar"
         }
}

In this example, we use the latest driver version available (2.0). To download the latest JDBC driver, see Download the Amazon Redshift JDBC driver, version 2.0.

  1. Run cells 1.3–1.4 in the notebook (collapsed to improved readability) to add FinSpace helper classes found in public GitHub examples and to add utility functions.

Python helper classes help with schema and table creation, cluster management, and more. The utility functions help translate Amazon Redshift data to a FinSpace schema.

Next, you update the user group ID that should get access to the datasets, and update the Amazon Redshift connection parameters.

  1. On the FinSpace console, choose Settings (gear icon).
  2. Chose Users and User Groups.
  3. Select a group and copy the group ID from the URL.
  4. On the Amazon Redshift console, open your cluster.
  5. Note the cluster endpoint information from the General information section.
  6. Note your database name, port, and admin user name in the Database configurations section.

If you don’t know your user name or password, contact your Amazon Redshift administrator.

Populate the FinSpace catalog with tables from Amazon Redshift

Now we’re ready to import table metadata from Amazon Redshift into FinSpace. For each table, we create a FinSpace dataset, populate the attribute set we created with the metadata about the table (catalog, schema, table names, and Redshift subcategory for the Source category), and associate the populated attribute set to the created dataset.

  1. Use spark.read to retrieve a list of tables and columns as a Spark DataFrame:
spark.read.format("jdbc").option("driver","com.amazon.redshift.jdbc42.Driver").option("url", urlStr).option("query", Query).load()

As a result, you get two DataFrames, tablesDF and schemaDF, containing a list of tables and associated metadata (database, schema, table names, and comments) as shown in the following screenshot.

  1. Get the attribute set Redshift Table Attributes that we created earlier by running finspace.attribute_set(att_name). We use its identifiers for populating the metadata for each dataset we create in FinSpace.
# Get the attribute set
sfAttrSet = finspace.attribute_set(att_name)

att_def = None
att_fields = None

# Get the fields of the attribute set
att_resp = finspace.describe_attribute_set(sfAttrSet['id'])

if 'definition' in att_resp: 
    att_def = att_resp['definition']
    
if 'fields' in att_def:
    att_fields = att_def['fields']
  1. Get an ID for the Redshift subcategory to populate the attribute set and identify the datasets with the Amazon Redshift source:
source_cls = finspace.classification('Source')

source_fields = finspace.describe_classification(source_cls['id'])
source_key = None

for n in source_fields['definition']['nodes']:
    if n['fields']['name'] == source_name: 
        source_key = n['key']

# this is the key for source in the Category
print(f'Source: {source_name} Key: {source_key}')

As an output, you get the source_key ID for the Redshift subcategory.

  1. Use list_dataset_metadata_by_taxonomy_node(taxonomyId, source_key) to get the list of existing datasets in FinSpace to avoid duplicating the data if an Amazon Redshift table already exists in FinSpace:
# Get all the datasets from Redshift (classification type Source, with values ‘Redshift’)
resp = finspace.client.list_dataset_metadata_by_taxonomy_node(taxonomyId=source_cls['id'], taxonomyNodeKey=source_key)

# Get a list of datasets to iterate over
datasets = resp['datasetMetadataSummaries']

# Build the lookup table for existing datasets from Redshift to avoid creating duplicates
types_list = []

for s in datasets:

        # end of the arn is the dataset ID
        dataset_id = os.path.basename(s['datasetArn'])

        # get the details of the dataset (name, description, etc)
        dataset_details_resp = finspace.client.describe_dataset_details(datasetId=dataset_id)

        dataset_details = None
        dataset_types   = None
        owner_info = None
        taxonomy_info = None
        
        if 'dataset' in dataset_details_resp:
            dataset_details = dataset_details_resp["dataset"]

        if 'datasetTypeContexts' in dataset_details_resp:
            dataset_types = dataset_details_resp["datasetTypeContexts"]

        if 'ownerinfo' in dataset_details_resp:
            owner_info = dataset_details_resp["ownerinfo"]

        if 'taxonomyNodesinfo' in dataset_details_resp:
            taxonomy_info = dataset_details_resp["taxonomyNodesinfo"]
            
        # Pull Redshift attribute set from the list of dataset_types

        # first check the definition, then extract the values against the definition
        # have the keys of values/labels as the column header?
        for dt in dataset_types:
            if (dt['definition']['name'] != att_name):
                continue

            dd = {
                'dataset_id' : dataset_id
            }

            # used to map the field name (id) to the tile seen in the UI
            field_map = {}

            # get the field titles for name
            for f in dt['definition']['fields']:
                field_map[f['name']] = f['title']

            # human readable, else the keys would be numbers
            for v in dt['values']:
                dd[field_map[v['field']]] = v['values']

            types_list.append(dd)

types_pdf = pd.DataFrame(types_list)

If you already have tables tagged with Redshift as a source, your output looks similar to the following screenshot.

  1. Set permissions and owner details by updating the following code with your desired values:
basicPermissions = [
"ViewDatasetDetails",
"ReadDatasetData",
"AddDatasetData",
"CreateSnapshot",
"EditDatasetMetadata",
"ManageDatasetPermissions",
"DeleteDataset"
]

# All datasets have ownership
basicOwnerInfo = {
"phoneNumber" : "12125551000",
"email" : "[email protected]",
"name" : "Jane Doe"
}
  1. Create a DataFrame with a list of tables in Amazon Redshift to iterate over:
tablesPDF = tablesDF.select('TABLE_CATALOG', 'TABLE_SCHEMA', 'TABLE_NAME', 'COMMENT').toPandas()
  1. Run the following code to:
    1. Check if a table already exists in FinSpace;
    2. If it doesn’t exist, get table’s schema and create an attribute set;
    3. Add the description and the attribute set to the dataset (Catalog, Schema, Table names, and Source).
c = 0
create=True

# For each table, create a dataset with the necessary attribute set populated and associated to the dataset
for index, row in tablesPDF.iterrows():
    
    c = c + 1
        
    catalog = row.TABLE_CATALOG
    schema  = row.TABLE_SCHEMA
    table   = row.TABLE_NAME
    
    # do we already have this dataset?
    exist_i = None
    for ee_i, ee in types_pdf.iterrows():
        if catalog in ee.Catalog:
            if schema in ee.Schema:
                if table in ee.Table:
                    exist_i = ee_i

    if exist_i is not None:
        print(f"Table exists in FinSpace: \n{types_pdf.iloc[[exist_i]]}")
        continue

    # Attributes and their populated values
    att_values = [
        { 'field' : get_field_by_name(att_fields, 'Catalog'), 'type' : get_field_by_name(att_fields, 'Catalog', 'type')['name'], 'values' : [ catalog ] },
        { 'field' : get_field_by_name(att_fields, 'Schema'),  'type' : get_field_by_name(att_fields, 'Schema', 'type')['name'],  'values' : [ schema ] },
        { 'field' : get_field_by_name(att_fields, 'Table'),   'type' : get_field_by_name(att_fields, 'Table', 'type')['name'],   'values' : [ table ] },
        { 'field' : get_field_by_name(att_fields, 'Source'),  'type' : get_field_by_name(att_fields, 'Source', 'type')['name'],  'values' : [ source_key ] },
    ]

    # get this table's schema from Redshift
    tableSchemaPDF = schemaDF.filter(schemaDF.table_name == table).filter(schemaDF.table_schema == schema).select('ORDINAL_POSITION', 'COLUMN_NAME', 'IS_NULLABLE', 'DATA_TYPE', 'COMMENT').orderBy('ORDINAL_POSITION').toPandas()

    print(tableSchemaPDF)
    # translate Redshift schema to FinSpace Schema
    fs_schema = get_finspace_schema(tableSchemaPDF)

    # name and description of the dataset to create
    name = f'{table}'
    description = f'Redshift table from catalog: {catalog}'
    
    if row.COMMENT is not None:
        description = row.COMMENT
    
    print(f'name: {name}')
    print(f'description: {description}')

    print("att_values:")
    for i in att_values:
        print(i)

    print("schema:")
    for i in fs_schema['columns']:
        print(i)
    
    if (create):
        # create the dataset
        dataset_id = finspace.create_dataset(
            name = name,
            description = description,
            permission_group_id = group_id,
            dataset_permissions = basicPermissions,
            kind = "TABULAR",
            owner_info = basicOwnerInfo,
            schema = fs_schema
        )

        print(f'Created, dataset_id: {dataset_id}')

        time.sleep(20)

        # associate tha attributes to the dataset
        if (att_name is not None and att_values is not None):
            print(f"Associating values to attribute set: {att_name}")
            finspace.associate_attribute_set(att_name=att_name, att_values=att_values, dataset_id=dataset_id) 

Search the FinSpace catalog for data

Analysts can search for datasets available to them in FinSpace and refine the results using category filters. To analyze our trading activity in the next section, we need to find two datasets: all trades of AMZN stock, and the buy and sell orders from the Amazon Redshift database.

  1. Search for “AMZN” or “US Equity TAQ Sample” to find the “US Equity TAQ Sample – 14 Symbols 6 Months – Sample” dataset provided as part of the Capital Markets Sample Data Bundle.

You can explore the dataset schema and review the attribute set.

  1. Copy the dataset ID and data view ID on the Data View Details page.

We use these IDs in the next section to connect to the data view in FinSpace and analyze our trading activity.

Next, we find the trade_history dataset that we created from the Amazon Redshift table and copy its dataset ID.

  1. On the FinSpace console, choose Source under BROWSE DATA and choose Redshift.
  2. Open the trade_history table.
  3. Copy the dataset ID located in the URL.

Users with permissions to create datasets can also update the dataset with additional information, including a description and owner contact information if those details have changed since the dataset was created in FinSpace.

Use FinSpace notebooks to analyze data from both FinSpace and Amazon Redshift

We’re now ready to analyze the data.

  1. Import the analysis notebook to JupyterLab in FinSpace.
  2. Follow the steps covered in the previous section, Connect to Amazon Redshift from a FinSpace Jupyter notebook using JDBC, to connect to the FinSpace cluster and add a JDBC driver to Spark jars. Add helper and utility functions.
  3. Set up your database connection and date parameters. In this scenario, we analyze trading activity for January 2, 2021.
  4. Connect to Amazon Redshift and query the table directly. Import the data as a Spark DataFrame.
myTrades  = get_dataframe_from_database(dataset_id = dataset_id_db, att_name = db_att_name)

As a result, you get the data stored in the Amazon Redshift database as a Spark DataFrame.

  1. Filter for stock purchase transactions (labeled as P) and calculate an average price paid:
avgPrice = (myTrades.filter( myTrades.trans_date == aDate )
                    .filter(myTrades.trans_type == "P")
                    .select('price')
                    .agg({'price':'avg'}))
  1. Get trading data from the FinSpace Capital Markets dataset:
df = finspace.read_view_as_spark(dataset_id = dataset_id, view_id = view_id)
  1. Apply date, ticker, and trade type filters:
import datetime as dt
import pandas as pd

fTicker = 'AMZN'

pDF = (
    df.filter( df.date == aDate )
    .filter(df.eventtype == "TRADE NB")
    .filter(df.ticker == fTicker)
    .select('price', 'quantity')
).toPandas()
  1. Compare the average purchase price to the daily trading price and plot them to compare how close we got to the lowest price.
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1, 1, figsize=(12, 6))

pDF["price"].plot(kind="hist", weights=pDF["quantity"], bins=50, figsize=(12,6))
plt.axvline(x=avgPrice.toPandas(), color='red')

# Add labels
plt.title(f"{fTicker} Price Distribution vs Avg Purchase Price")
plt.ylabel('Trades')
plt.xlabel('Price')
plt.subplots_adjust(bottom=0.2)

%matplot plt

As a result, you get a distribution of AMZN stock prices traded on January 2, 2021, which we got from a dataset in FinSpace. The red line in the following graph is the average price we paid for the stock calculated from the transaction data stored in Amazon Redshift. Although we didn’t pay the highest price traded that day, we performed average, paying $1,877 per share versus the lowest price of $1,865.

Clean up

If your work with FinSpace or Amazon Redshift is complete, delete the Amazon Redshift cluster or the FinSpace environment to avoid incurring additional fees.

Conclusion

In this post, we reviewed how to connect the Amazon Redshift database and FinSpace in order to create new datasets in FinSpace using the table metadata from Amazon Redshift. We then explored how to look for available data in the FinSpace web app to find two datasets that can help us evaluate how close we got to the best daily price. Finally, we used FinSpace dataset details to import the data into two DataFrames and plot price distribution versus the average price we paid. As a result, we reduced the time it takes to discover and connect to datasets needed for analyzing trading transactions.

Download the import and analysis Jupyter notebooks discussed in this blog post on GitHub.

Visit the FinSpace user guide to learn more about the service, or contact us to discuss FinSpace or Amazon Redshift in more detail.


About the Authors

Mariia Berezina is a Sr. Launch Manager at AWS. She is passionate about building new products to help customers get the most out of data. When not working, she enjoys mentoring women in technology, diving, and traveling the world.

Vincent Saulys is a Principal Solutions Architect at AWS working on FinSpace. Vincent has over 25 years of experience solving some of the world’s most difficult technical problems in the financial services industry. He is a launcher and leader of many mission-critical breakthroughs in data science and technology on behalf of Goldman Sachs, FINRA, and AWS.

Register now for Flink Forward Global, October 26-27, 2021

Post Syndicated from Deepthi Mohan original https://aws.amazon.com/blogs/big-data/register-now-for-flink-forward-global-october-26-27-2021/

Flink Forward Global 2021 is a 2-day virtual conference for the Apache Flink and stream processing communities. Apache Flink is an open-source distributed engine for processing data streams that can support both streaming and batch workloads. Amazon Kinesis Data Analytics is a fully managed service for Apache Flink on AWS that reduces the complexity of building, managing, and integrating Apache Flink applications with other AWS services. You can use Kinesis Data Analytics for Apache Flink to process data from Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, and a variety of data sources for use cases such as streaming ETL (extract, transform, and load), log analysis, event-driven applications, and anomaly and fraud detection in real time.

Flink Forward has keynote presentations and talks on production Flink use cases, technical deep dive sessions, and the growth of the Flink ecosystem. You can meet core Flink committers, new and experienced users, and thought leaders who share experiences and best practices in stream processing, real-time analytics, and the management of mission-critical Flink deployments in production.

AWS is a Platinum sponsor for Flink Forward. If you’re interested in learning about real-time data processing at scale, register now to attend.

Implement anti-money laundering solutions on AWS

Post Syndicated from Yomi Abatan original https://aws.amazon.com/blogs/big-data/implement-anti-money-laundering-solutions-on-aws/

The detection and prevention of financial crime continues to be an important priority for banks. Over the past 10 years, the level of activity in financial crimes compliance in financial services has expanded significantly, with regulators around the globe taking scores of enforcement actions and levying $36 billion in fines. Apart from the fines, the overall cost of compliance for global financial services companies is suspected to have reached $181 billion in 2020. For most banks, know your customer (KYC) and anti-money laundering (AML) constitute the largest area of concern within the broader financial crime compliance. In light of this, there is an urgent need to have effective AML systems that are scalable and fit for purpose in order to manage the risk of money laundering as well as the risk of non-compliance by the banks. Addressing money laundering at a high-level covers the following areas:

  • Client screening and identity
  • Transaction monitoring
  • Extended customer risk profile
  • Reporting of suspicious transactions

In this post we focus on transaction monitoring by looking at the general challenges with implementing transaction monitoring (TM) solutions and how AWS services can be leveraged to build a solution in the cloud from the perspectives of data analytics; risk management and ad hoc analysis. The following diagram is a conceptual architecture for a transaction monitoring solution on the AWS Cloud.

Current challenges

Due to growing digital channels for facilitating financial transactions, the increasing access to financial services for more people, and the growth in global payments; capturing and processing data related to TM is now considered a big data challenge. The big data challenges and observations include:

  • The volume of data continues to prove to be too expansive for effective processing in a traditional on-premises data center solution.
  • The velocity of banking transactions continues to rise despite the economic challenges of COVID-19.
  • The variety of the data that needs to be processed for TM platforms continues to increase as more data sources with unstructured data become available. These data sources require techniques such as optical character recognition (OCR) and natural language processing (NLP) to automate the process of getting value out of such data without excessive manual effort.
  • Finally, due to the layered nature of complex transactions involved in TM solutions, having data aggregated from multiple financial institutions provides a more comprehensive insight into the flow of financial transactions. Such an aggregation is usually less viable in a traditional on-premises solution.

Data Analytics

The first challenge with implementing TM solutions is having the tools and services to ingest data into a central store (often called a data lake) that is secure and scalable. Not only does this data lake need to capture terabytes or even petabytes of data, but it also needs to facilitate the process of moving data in and out of purpose-built data stores for time series, graph, data marts, and machine learning (ML) processing. In AWS, we refer to a data architecture which covers data lakes, purpose-built data stores and the data movement across data stores as a lake house architecture.

The following diagram illustrates a TM architecture on the AWS Cloud. This is a more detailed sample architecture of the lake house approach.

Ingestion of data into the lake house typically comes from a client’s data center (if the client is not already on the cloud), or from different client AWS accounts that host transaction systems or from external sources. For clients with transaction systems still on premises, we notice although several AWS services can be used to transfer data from on premises to the AWS Cloud, a number of our clients with a batch requirement utilize AWS Transfer Family, which provides fully managed support for secure file transfers directly into and out of Amazon Simple Storage Service (Amazon S3) or Amazon Elastic File System (Amazon EFS). With real-time requirements, we see the use of Amazon Managed Streaming for Apache Kafka (Amazon MSK), which is a fully managed service that makes it easy for you to build and run applications that use Apache Kafka to process streaming data. One other way to bring in reference data or external data like politically exposed persons (PEP) lists, watch lists, or stop lists for the AML process is via AWS Data Exchange, which makes it easy to find, subscribe to, and use third-party data in the cloud.

In this architecture, the ingestion process always stores the raw data in Amazon S3, which offers industry-leading scalability, data availability, security, and performance. For those clients already on the AWS Cloud, it’s very likely your data is already stored in Amazon S3.

For TM, the ingestion of the data comes from KYC systems, customer account stores, as well as transaction repositories. Data from KYC systems need to have the entity information, which can relate to a company or individual. For the corporate entities, information on the underlying beneficiary owners (UBOs)—the natural persons who directly or indirectly own or control a certain percentage of company—is also required. Before we discuss the data pipeline (the flow of data from the landing zone to the curated data layer) in detail, it’s important to address some of the security and audit requirements of the sensitive data classes typically used in AML processing.

According to Gartner, “Data governance is the specification of decision rights and an accountability framework to ensure the appropriate behavior in the valuation, creation, consumption, and control of data and analytics.” From an AML perspective, the specification and the accountable framework mentioned in this definition requires several enabling components.

The first is a data catalog, which is sometimes grouped into technical, process, and business catalogs. On the AWS platform, this catalog is provided either directly through AWS Glue or indirectly through AWS Lake Formation. Although the catalog implemented by AWS Glue is fundamentally a technical catalog, you can still extend it to add process and business relevant attributes.

The second enabling component is data lineage. This service should be flexible enough to support the different types of data lineage, namely vertical, horizontal, and physical. From an AML perspective, vertical lineage can provide a trace from AML regulation, which requires the collection of certain data classes, all the way to the data models captured in the technical catalog. Horizontal and physical lineage provide a trace of the data from source to eventual suspicious activity reporting for suspected transactions. Horizonal lineage provides lineage at the metadata level, whereas physical lineage captures trace at the physical level.

The third enabling component of data governance is data security. This covers several aspects of dealing with requirements of encryption of data at rest and in transit, but also de-identification of data during processing. This area requires a range of de-identification techniques depending on the context of use. Some of the techniques include tokenization, encryption, generalization, masking, perturbation, redaction, and even substitution of personally identifiable information (PII) or sensitive data usually at the attribute level. It’s important to use the right de-identification technique to enforce the right level of privacy while still ensuring the data still has sufficient inference signals for use in ML. You can use Amazon Macie, a fully managed data security and data privacy service that uses ML and pattern matching to discover and protect sensitive data, to automate PII discovery prior to applying the right de-identification technique.

Moving data from landing zone (raw data) all the way to curated data involves several steps of processing, including data quality validation, compression, transformation, enrichment, de-duplication, entity resolution, and entity aggregation. Such processing is usually referred to as extract, transform, and load (ETL). In this architecture, we have a choice of using a serverless architecture based on AWS Glue (using Scala or Python programming languages) or implementing Amazon EMR (a cloud big-data platform for processing large datasets using open-source tools such as Apache Spark and Hadoop). Amazon EMR provides the flexibility to run these ETL workloads on Amazon Elastic Compute Cloud (Amazon EC2) instances, Amazon Elastic Kubernetes Service (Amazon EKS) clusters and also on AWS Outposts.

Risk management framework

The risk management framework part of the architecture contains the rules, thresholds, algorithms, models, and control policies that govern the process of detecting and reporting suspicious transactions. Traditionally, most TM solutions have relied solely on rule-based controls to implement AML requirements. However, these rule-based implementations quickly become complex and difficult to maintain, as criminals find new and sophisticated ways to circumvent existing AML controls. Apart from the complexity and maintenance, rule-based approaches usually result in large number of false positives. False positives in this context are when transactions are flagged as suspicious but turn out not to be. Some of the numbers here are quite remarkable, with some studies revealing less than 2% of cases actually turning to be suspicious. The implication of this is the operational costs and the teams of operational resources required to investigate these false positives. Another implication that sometimes get overlooked is the customer experience, in which a customer service like payment or clearing of transactions is delayed or declined due to false positives. This usually leads to a less than satisfactory customer experience. Despite the number of false positives, AML failings and subsequent fines are hardly out of the news; in one case the Financial Conduct Authority (FCA) in the United Kingdom deciding to take the unprecedented step of bringing criminal proceedings against a bank over failed AML processes.

In light of some of the shortcomings of a rule-based AML approach, a lot of research and focus has been performed by financial services customers, including RegTechs, on applying ML to detect suspicious transactions. One comprehensive study on the use of ML techniques in suspicious transaction detection is a paper published by Z. Chen et al. This paper was published in 2018 (which in ML terms is a lifetime ago), but the concepts and findings are still relevant. The paper highlights some of the common algorithms and challenges with using ML for AML. AML data is a high-dimensional space that usually requires dimensionality reduction through the use of algorithms like Principal Component Analysis (PCA) or autoencoders (neural networks used to learn efficient data encodings in an unsupervised manner). As part of feature engineering, most algorithms require the value of transactions (debits and credits) aggregated by time intervals—daily, weekly, and monthly. Clustering algorithms like k-means or some variants of k-means are used to create clusters for customer or transaction profiles. There is also the need to deal with class imbalance usually found in AML datasets.

All of these algorithms referenced in the Z. Chen et al paper are supported by Amazon SageMaker. SageMaker is a fully managed ML service that allows data scientists and developers to easily build, train, and deploy ML models for AML. You can also implement some of the other categories of algorithms that support AML such as behavioral modelling, risk scoring, and anomaly detection with SageMaker. You can use a wide range of algorithms to address AML challenges, including supervised, semi-supervised, and unsupervised models. Some additional factors that determine the suitability of algorithms include high recall and precision rate of the models, and the ability to utilize approaches such as SHapley Additive exPlanation (SHAP) and Local Interpretable Model-Agnostic Explanations (LIME) values to explain the model output. Amazon SageMaker Clarify can detect bias and increases transparency of ML models.

Algorithms that focus on risk scoring enable a risk profile that can span across various data classes such as core customer attributes including industry, geography, bank product, business size, complex ownership structure for entities, as well as transactions (debits and credits) and frequency of such transactions. In addition, external data such as PEP lists, various stop lists and watch lists, and in some cases media coverage related to suspected fraud or corruption can also be weighted into a customer’s risk profile.

Rule-based and ML approaches aren’t mutually exclusive, but it’s likely that rules will continue to play a peripheral role as better algorithms are researched and implemented. One of the reasons why the development of algorithms for AML has been sluggish is the availability of reliable datasets, which include result data indicating when a correct suspicious activity report (SAR) was filed for a given scenario. Unlike other areas of ML in which findings have been openly shared for further research, with AML, a lot of the progress first appears in commercial products belonging to vendors who are protective of their intellectual property.

Ad hoc analysis and reporting

The final part of the architecture includes support for case or event management tooling and a reporting service for the eventual SAR. These services can be AWS Marketplace solutions or developed from scratch using AWS services such as Amazon EKS or Amazon ECS. This part of the architecture also provides support for a very important aspect of AML: network analytics. Network or link analysis has three main components:

  • Clustering – The construction of graphs and representation of money flow. Amazon Neptune is a fast, reliable, fully managed graph database service that makes it easy to build and run applications that work with highly connected datasets.
  • Statistical analysis – Used to assist with finding metrics around centrality, normality, clustering, and eigenvector centrality.
  • Data visualization – An interactive and extensible data visualization platform to support exploratory data analysis. Findings from the network analytics can also feed into customer risk profiles and supervised ML algorithms.

Conclusion

None of the services or architecture layers described in this architecture are tightly coupled; different layers and services can be swapped with AWS Marketplace solutions or other FinTech or RegTech solutions that support cloud-based deployment. This means the AWS Cloud has a powerful ecosystem of native services and third-party solutions that can be deployed on the foundation of a Lake House Architecture on AWS to build a modern TM solution in the cloud. To find out more information about key of parts of the architecture described in this post, refer to the following resources:

For seeding data into a data lake (including taking advantage of ACID compliance):

For using Amazon EMR for data pipeline processing and some of recent updates to the Amazon EMR:

For taking advantage of SageMaker to support financial crime use cases:

Please contact AWS if you need help developing a full-scale AML solution (covering client screening and identity, transaction monitoring, extended customer risk profile and reporting of suspicious transactions) on AWS.


About the Author

Yomi Abatan is a Sr. Solution Architect based in London, United Kingdom. He works with financial services organisations, architecting, designing and implementing various large-scale IT solutions. He, currently helps established financial services AWS customers embark on Digital transformations using AWS cloud as an accelerator. Before joining AWS he worked in various architecture roles with several tier-one investment banks.

Introducing: Custom Hostname Analytics

Post Syndicated from Dina Kozlov original https://blog.cloudflare.com/introducing-custom-hostname-analytics/

Introducing: Custom Hostname Analytics

Introducing: Custom Hostname Analytics

In our last blog, we talked about how Cloudflare can help SaaS providers extend the benefits of our network to their customers. Today, we’re excited to announce that SaaS providers will now be able to give their customers visibility into what happens to their traffic when the customer onboards onto the SaaS provider, and inherently, onto the Cloudflare network.

As a SaaS provider, you want to see the analytics about the traffic bound for your service. Use it to see the global distribution of your customers, or to measure the success of your business. In addition to that, you want to provide the same insights to your individual customers. That’s exactly what Custom Hostname Analytics allows you to do!

The SaaS Setup

Imagine you run a SaaS service for burrito shops, called The Burrito Bot. You have your burrito service set up on shop.theburritobot.com and your customers can use your service either through a subdomain of your zone, i.e. dina.theburritobot.com, or through their own website e.g. burrito.example.com.

Introducing: Custom Hostname Analytics

When customers onboard to your burrito service, they become fully reliant on you to provide their website with the fastest load time, the best protection, and the highest uptime. Similarly, when SaaS providers onboard to Cloudflare, they expect the same — and we deliver. The easiest way that we show this to our customers is through analytics. We put ourselves in front of their website, blocking attacks and accelerating traffic. Then, through dashboards like Bot Analytics or Cache Analytics, we show insights about bad bots and low latency in real time.

In the same way that it’s our responsibility to show SaaS providers all the benefits we’re providing for their traffic, we think SaaS providers should be able to provide the same information to their customers.

Analytics for the SaaS Provider

As a SaaS provider, your infrastructure is your customers’ infrastructure, so you need to have visibility into the traffic of your service to be able to make business decisions. Being able to answer questions like “how many total requests am I getting on my service?”, “Which customer is transferring the most data?”, or “How many global customers do I have?” can help you figure out how to bill your customers or how and where to scale your infrastructure. With custom hostname analytics, you can get the full view of how customers are using your services through our Dashboard or API.

Introducing: Custom Hostname Analytics

Here you can see that one custom hostname is using significantly more data transfer than the others, and you might want to charge them accordingly. Alternatively, you can look at the geographic breakdown of your customers. If it looks like burrito shops are growing in Europe, you might want to think about expanding your business there and adding new origins to serve that traffic.

Introducing: Custom Hostname Analytics

Analytics for your customers

In the same way that you want to see the breakdown of traffic bound for your service, your customers want to see the same information about their website.  They want to know how many page views they’re getting, if they’re having any bots ordering fake burritos, or how fast their website is. With custom hostname analytics, we’re giving SaaS providers the resources they need to present this data to their customers.

Build your own dashboards!

The most powerful way to use our technology would be to use our GraphQL Analytics API with the clientRequestHTTPHost field to get analytics for each of your customers’ domains. This will allow you to build your own dashboards and display the information that you feel is important to your customers.

Show your customer the bad traffic you’re blocking

Let’s say you’re using Cloudflare for SaaS and are extending Bot Management to your thousands of customers — wouldn’t you want to show them how much malicious traffic you’re keeping away from them?

You can do that!

One way to see the analytics for your custom hostnames is in the Dashboard. You can either look up the total requests for an individual hostname or — by adding the filter Host does not equal theburritobot.com — total requests for all your custom hostnames.

Introducing: Custom Hostname Analytics

What else can I see?

You can use Custom Hostname analytics for just about everything that you can see for your own domain. From your firewall to bot protection, you can use any dataset with a clientRequestHTTPHost field for your custom hostname analytics.

Interested in trying this out?

Sign up for our Cloudflare for SaaS Beta. We are continuing to accept applicants and are excited to announce that General Availability is not too far away.

Share what you build

We’d love to see what kind of dashboards you build with our Analytics API. If you want to share what you’ve built, tweet at @Cloudflare and send us a screenshot!

Decoding the Social Effects Of Media with Machine Learning

Post Syndicated from Julien Simon original https://aws.amazon.com/blogs/aws/decoding-the-social-effects-of-media-with-machine-learning/

What if media were optimized to benefit people? This thought-provoking question is at the core of Harmony Labs‘ mission. A nonprofit organization headquartered in New York City, Harmony Labs strives to better understand the impact of media on society, and build communities and tools to reform and transform media systems.

Brian WaniewskiAs Brian Wanieswki, Executive Director at Harmony Labs puts it: “The media systems that we have now, for better or worse, have become outrage machines and sorting machines that put people into groups of like minds. The business incentive structures of these systems are such that the more outrage there is, the more profit there is. Political events across the world in recent years have borne out what these media systems produce, and it’s really pretty toxic, and pretty hard to get anything done within. There are all kinds of natural divisions between people, but these media systems tend to reinforce these divisions. So, the first question that we’re asking is, What’s the scope of this problem? And then, What can we do to solve it?”

Harmony Labs use data science and machine learning to answer these questions. Starting from user surveys and media data, they developed advanced natural language processing pipelines that can identify how social issues are represented in media, how they are consumed by different audiences, and what kind of influence that consumption has.

So how do you get that data in first place? Brian says: “All the media data that we need lives inside private companies. We knew that data sharing would be central to our mission, and this is why we’re structured as a nonprofit. We are working in the public’s interest in a non-partisan way. We’ve done big data sharing deals with large companies, as well as startups scraping different corners of the media ecosystem that we’re interested in: internet TV, internet radio, and so on. We have about 10 data partners at the moment, and we’re always looking to expand.

Thanks to their data partners, Harmony Labs has collected over 50 Terabytes of diverse media: TV, web, mobile, song lyrics, closed captions, social media, and more. This definitely fits the definition of big data (volume, velocity and variety). Working with languages like Golang, Python and R, the Harmony Labs data science and engineering teams rely on AWS services such as Amazon Aurora, Amazon Athena, AWS Glue and Amazon Elastic Kubernetes Service (EKS) to build their data ingestion and processing workflows.

Laura EdelsonOnce the data is in-house, Harmony Labs make it safe, secure, and accessible to a network of academic researchers who use it to investigate the influence of media systems on politics, society, and culture. Laura Edelson is one of these researchers. A Ph.D. Candidate in Computer Science at NYU’s Tandon School of Engineering, she studies online political communication and develops methods to identify inauthentic content and activity. Harmony Labs supported her on the Ad Observatory project, an exploration of political ads on Facebook.

Harmony Labs also work on their own projects, such as the Narrative Observatory. “A narrative is a story pattern that recurs across different kinds of stories and media. You’ll find them in song lyrics, TV shows, news articles, and more“, says Brian. The Narrative Observatory helps identify narratives on particular topics and track them over long periods of time, and across different media types.

With initial funding from the Bill & Melinda Gates Foundation, Harmony Labs studied narratives linked to the topic of poverty and economic mobility in the United States. Collecting millions of documents (online news, social media, music), they first identified the main narratives present in media. Then, using segmentation techniques, behavioral data on over 50,000 Americans and surveys, Harmony Labs defined four audiences, as well as their dominant narrative, their core values, and their views on specific social issues. Finally, Harmony Labs studied how each audience consumed narratives.

To enable funders, partners, and media companies to gain a deeper understanding of the cultural spaces their audiences occupy, they built a fascinating website, obiaudiences.org, where you can pick an audience and view the associated media feed. In other words, you can literally see the world in someone else’s eyes: what issues they care most about, what media they read most, and so on. This helps to understand the perceptions that different people have on certain issues, and as Brian puts it: “If you’re trying to reach people, it’s important to understand the media world they inhabit, and what can be actually relevant to that world.

Narrative Observatory

Recently, Harmony Labs led a project funded by the Mozilla Foundation on defining a healthy narrative for artificial intelligence (AI). Studying the TV consumption habits of over 80,000 US adults, and connecting them with closed captioned transcripts and ads, they identified and named the main media narratives on AI. Each narrative includes a definition of AI, the emotion that it creates in people, and whether they think that AI will lead to a happy or an unhappy ending.

Harmony Labs identified four main narratives on AI. Two are extremely negative and fear-inducing. “Tool of Tyranny” says that AI will be used by governments to oppress people. “Robot Overlords” says that we’ll never be able to control AI, and it will end up ruling us. At the other end of the spectrum, the “Wishes Granted” narrative is extremely positive: sure, we don’t understand AI, but it’s a magic wand that will solve all our issues. The last narrative, “Augmented Intelligence”, is more balanced: yes, AI is a great opportunity to improve our daily lives, but it’s also capable of being unfair and even dangerous. We are responsible for designing it, controlling it, and making sure it’s used to help us, not to hurt us.

Harmony Labs found that the “Wishes Granted” narrative was the most prominent (67%). It shines a positive light on AI, but its naive and over-optimistic vision can hide the legitimate questions that AI raises. Still, it’s a good starting point to engage audiences, educate them with the “Augmented Intelligence” narrative, and increase their awareness on both opportunities and challenges.

Closing this post, I’m wondering which AI narrative I’ve actually promoted here, willingly or not! What do you think? One thing is certain: Harmony Labs is using AI to help us understand how media influences us every day, and how we can create a more democratic society. This is important work, and we’re humbled that they picked AWS to help them reach their goals.

For more information on Harmony Labs, please visit harmonylabs.org and harmonylabs.medium.com.

– Julien

Kinesis Data Firehose now supports dynamic partitioning to Amazon S3

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/kinesis-data-firehose-now-supports-dynamic-partitioning-to-amazon-s3/

Amazon Kinesis Data Firehose provides a convenient way to reliably load streaming data into data lakes, data stores, and analytics services. It can capture, transform, and deliver streaming data to Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service, generic HTTP endpoints, and service providers like Datadog, New Relic, MongoDB, and Splunk. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt your data streams before loading, which minimizes the amount of storage used and increases security.

Customers who use Amazon Kinesis Data Firehose often want to partition their incoming data dynamically based on information that is contained within each record, before sending the data to a destination for analysis. An example of this would be segmenting incoming Internet of Things (IoT) data based on what type of device generated it: Android, iOS, FireTV, and so on. Previously, customers would need to run an entirely separate job to repartition their data after it lands in Amazon S3 to achieve this functionality.

Kinesis Data Firehose data partitioning simplifies the ingestion of streaming data into Amazon S3 data lakes, by automatically partitioning data in transit before it’s delivered to Amazon S3. This makes the datasets immediately available for analytics tools to run their queries efficiently and enhances fine-grained access control for data. For example, marketing automation customers can partition data on the fly by customer ID, which allows customer-specific queries to query smaller datasets and deliver results faster. IT operations or security monitoring customers can create groupings based on event timestamps that are embedded in logs, so they can query smaller datasets and get results faster.

In this post, we’ll discuss the new Kinesis Data Firehose dynamic partitioning feature, how to create a dynamic partitioning delivery stream, and walk through a real-world scenario where dynamically partitioning data that is delivered into Amazon S3 could improve the performance and scalability of the overall system. We’ll then discuss some best practices around what makes a good partition key, how to handle nested fields, and integrating with Lambda for preprocessing and error handling. Finally, we’ll cover the limits and quotas of Kinesis Data Firehose dynamic partitioning, and some pricing scenarios.

Data partitioning with Kinesis Data Firehose

First, let’s discuss why you might want to use dynamic partitioning instead of Kinesis Data Firehose’s standard timestamp-based data partitioning. Consider a scenario where your analytical data lake in Amazon S3 needs to be filtered according to a specific field, such as a customer identification—customer_id. Using the standard timestamp-based strategy, your data will look something like this, where <DOC-EXAMPLE-BUCKET> stands for your bucket name.

s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=01/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=01/part-0001.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=02/part-0002.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=02/part-0003.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=03/part-0004.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=03/part-0005.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=04/part-0006.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=04/part-0007.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=05/part-0008.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=05/part-0009.parquet
s3://<DOC-EXAMPLE-BUCKET>/year=2021/month=06/day=06/part-0010.parquet

The difficulty in identifying particular customers within this array of data is that a full file scan will be required to locate any individual customer. Now consider the data partitioned by the identifying field, customer_id.

s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-000/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-001/part-0001.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-002/part-0002.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-003/part-0003.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-004/part-0004.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-005/part-0005.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-006/part-0006.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-007/part-0007.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-008/part-0008.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-009/part-0009.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=customer-010/part-0010.parquet

In this data partitioning scheme, you only need to scan one folder to find data related to a particular customer. This is how analytics query engines like Amazon Athena, Amazon Redshift Spectrum, or Presto are designed to work—they prune unneeded partitioning during query execution, thereby reducing the amount of data that is scanned and transferred. Partitioning data like this will result in less data scanned overall.

Key features

With the launch of Kinesis Data Firehose Dynamic Partitioning, you can now enable data partitioning to be dynamic, based on data content within the AWS Management Console, AWS Command Line Interface (AWS CLI), or AWS SDK when you create or update an existing Kinesis Data Firehose delivery stream.

At a high level, Kinesis Data Firehose Dynamic Partitioning allows for easy extraction of keys from incoming records into your delivery stream by allowing you to select and extract JSON data fields in an easy-to-use query engine.

Kinesis Data Firehose Dynamic Partitioning and key extraction will result in larger file sizes landing in Amazon S3, in addition to allowing for columnar data formats, like Apache Parquet, that query engines prefer.

With Kinesis Data Firehose Dynamic Partitioning, you have the ability to specify delimiters to detect or add on to your incoming records. This makes it possible to clean and organize data in a way that a query engine like Amazon Athena or AWS Glue would expect. This not only saves time but also cuts down on additional processes after the fact, potentially reducing costs in the process.

Kinesis Data Firehose has built-in support for extracting the keys for partitioning records that are in JSON format. You can select and extract the JSON data fields to be used in partitioning by using JSONPath syntax. These fields will then dictate how the data is partitioned when it’s delivered to Amazon S3. As we’ll discuss in the walkthrough later in this post, extracting a well-distributed set of partition keys is critical to optimizing your Kinesis Data Firehose delivery stream that uses  dynamic partitioning.

If the incoming data is compressed, encrypted, or in any other file format, you can include in the PutRecord or PutRecords API calls the data fields for partitioning. You can also use the integrated Lambda function with your own custom code to decompress, decrypt, or transform the records to extract and return the data fields that are needed for partitioning. This is an expansion of the existing transform Lambda function that is available today with Kinesis Data Firehose. You can transform, parse, and return the data fields by using the same Lambda function.

In order to achieve larger file sizes when it sinks data to Amazon S3, Kinesis Data Firehose buffers incoming streaming data to a specified size or time period before it delivers to Amazon S3. Buffer sizes range from 1 MB to 4 GB when data is delivered to Amazon S3, and the buffering interval ranges from 1 minute to 1 hour.

Example of a partitioning structure

Consider the following clickstream event.

{
    "type": {
        "device": "mobile",
        "event": "user_clicked_submit_button"
    },
    "customer_id": "1234",
    "event_timestamp":1630442383,
    "geo": "US"
}

You can now partition your data by customer_id, so Kinesis Data Firehose will automatically group all events with the same customer_id and deliver them to separate folders in your S3 destination bucket. The new folders will be created dynamically—you only specify which JSON field will act as dynamic partition key.

Assume that you want to have the following folder structure in your S3 data lake.

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/part-0001.parquet

s3://<DOC-EXAMPLE-BUCKET>/customer_id=4567/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=4567/part-0001.parquet 

The Kinesis Data Firehose configuration for the preceding example will look like the one shown in the following screenshot.

Kinesis Data Firehose evaluates the prefix expression at runtime. It groups records that match the same evaluated S3 prefix expression into a single dataset. Kinesis Data Firehose then delivers each dataset to the evaluated S3 prefix. The frequency of dataset delivery to S3 is determined by the delivery stream buffer setting.

You can do even more with the jq JSON processor, including accessing nested fields and create complex queries to identify specific keys among the data.

In the following example, I decide to store events in a way that will allow me to scan events from the mobile devices of a particular customer.

{
    "type": {
        "device": "mobile",
        "event": "user_clicked_submit_button"
    },
    "customer_id": "1234",
    "event_timestamp": 1630442383
    "geo": "US"
}

Given the same event, I’ll use both the device and customer_id fields in the Kinesis Data Firehose prefix expression, as shown in the following screenshot. Notice that the device is a nested JSON field.

The generated S3 folder structure will be as follows, where <DOC-EXAMPLE-BUCKET> is your bucket name.

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/device=mobile/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/device=mobile/part-0001.parquet

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/device=desktop/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/device=desktop/part-0001.parquet

Now assume that you want to partition your data based on the time when the event actually was sent, as opposed to using Kinesis Data Firehose native support for ApproximateArrivalTimestamp, which represents the time in UTC when the record was successfully received and stored in the stream. The time in the event_timestamp field might be in a different time zone.

With Kinesis Data Firehose Dynamic Partitioning, you can extract and transform field values on the fly. I’ll use the event_timestamp field to partition the events by year, month, and day, as shown in the following screenshot.

The preceding expression will produce the following S3 folder structure, where <DOC-EXAMPLE-BUCKET> is your bucket name.

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/year=2021/month=05/day=21/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/year=2021/month=05/day=21/part-0001.parquet

s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/year=2021/month=05/day=22/part-0000.parquet
s3://<DOC-EXAMPLE-BUCKET>/customer_id=1234/year=2021/month=05/day=22/part-0001.parquet

Create a dynamically partitioned delivery stream

To begin delivering dynamically partitioned data into Amazon S3, navigate to the Amazon Kinesis console page by searching for or selecting Kinesis.

From there, choose Create Delivery Stream, and then select your source and sink.

For this example, you will receive data from a Kinesis Data Stream, but you can also choose Direct PUT or other sources as the source of your delivery stream.

For the destination, choose Amazon S3.

Next, choose the Kinesis Data Stream source to read from. If you have a Kinesis Data Stream previously created, simply choose Browse and select from the list. If not, follow this guide on how to create a Kinesis Data Stream.

Give your delivery stream a name and continue on to the Transform and convert records section of the create wizard.

In order to transform your source records with AWS Lambda, you can enable data transformation. This process will be covered in the next section, and we’ll leave both the AWS Lambda transformation as well as the record format conversion disabled for simplicity.

For your S3 destination, select or create an S3 bucket that your delivery stream has permissions to.

Below that setting, you can now enable dynamic partitioning on the incoming data in order to deliver data to different S3 bucket prefixes based on your specified JSONPath query.

You now have the option to enable the following features, shown in the screenshot below:

  • Multi record deaggregation – Separate records that enter your delivery stream based on valid JSON criteria or a new line delimiter such as \n. This can be useful for data that comes in to your delivery stream in a specific format, but needs to be reformatted according to the downstream analysis engine.
  • New line delimiter – Configure your delivery stream to add a new line delimiter between records within objects delivered to Amazon S3, such as \n.
  • Inline parsing for JSON – Specify data record parameters to be used as dynamic partitioning keys, and provide a value for each key.

You can simply add your key/value pairs, then choose Apply dynamic partitioning keys to apply the partitioning scheme to your S3 bucket prefix. Keep in mind that you will also need to supply an error prefix for your S3 bucket before continuing.

Set your S3 buffering conditions appropriately for your use case. In my example, I’ve lowered the buffering to the minimum of 1 MiB of data delivered, or 60 seconds before delivering data to Amazon S3.

Keep the defaults for the remaining settings, and then choose Create delivery stream.

After data begins flowing through your pipeline, within the buffer interval you will see data appear in S3, partitioned according to the configurations within your Kinesis Data Firehose.

For delivery streams without the dynamic partitioning feature enabled, there will be one buffer across all incoming data. When data partitioning is enabled, Kinesis Data Firehose will have a buffer per partition, based on incoming records. The delivery stream will deliver each buffer of data as a single object when the size or interval limit has been reached, independent of other data partitions.

Lambda transformation of non-JSON records

If the data flowing through a Kinesis Data Firehose is compressed, encrypted, or in any non-JSON file format, the dynamic partitioning feature won’t be able to parse individual fields by using the JSONPath syntax specified previously. To use the dynamic partitioning feature with non-JSON records, use the integrated Lambda function with Kinesis Data Firehose to transform and extract the fields needed to properly partition the data by using JSONPath.

The following Lambda function will decode a user payload, extract the necessary fields for the Kinesis Data Firehose dynamic partitioning keys, and return a proper Kinesis Data Firehose file, with the partition keys encapsulated in the outer payload.

# This is an Amazon Kinesis Data Firehose stream processing Lambda function that  
# replays every read record from input to output  
# and extracts partition keys from the records.  
  
from __future__ import print_function  
import base64  
import json  
import datetime  
  
# Signature for all Lambda functions that user must implement  
def lambda_handler(firehose_records_input, context):  
      
    # Create return value.  
    firehose_records_output = {}  
    # Create result object.  
    firehose_records_output['records'] = []  
  
    print("\n")  
    # Go through records and process them.  
    for firehose_record_input in firehose_records_input['records']:  
  
        # Get user payload.  
        payload = base64.b64decode(firehose_record_input['data'])  
        jsonVal = json.loads(payload)  
          
        print("Record that was received")  
        print(jsonVal)  
        print("\n")  
        # Create output Firehose record and add modified payload and record ID to it.  
        firehose_record_output = {}  
        eventTimestamp = datetime.datetime.fromtimestamp(jsonVal['eventTimestamp'])  
        partitionKeys = {}  
        partitionKeys["customerId"] = jsonVal['customerId']  
        partitionKeys["year"] = eventTimestamp.strftime('%Y')  
        partitionKeys["month"] = eventTimestamp.strftime('%m')  
        partitionKeys["date"] = eventTimestamp.strftime('%d')  
        partitionKeys["hour"] = eventTimestamp.strftime('%H')  
        partitionKeys["minute"] = eventTimestamp.strftime('%M')  
  
          
        # Must set proper record ID.  
        firehose_record_output['recordId'] = firehose_record_input['recordId']  
        firehose_record_output['data'] = firehose_record_input['data']  
        firehose_record_output['result'] =  'Ok'  
        firehose_record_output['partitionKeys'] =  partitionKeys  
  
        # Add the record to the list of output records.  
        firehose_records_output['records'].append(firehose_record_output)  
  
    # At the end return processed records.  
    return firehose_records_output  

Using Lambda to extract the necessary fields for dynamic partitioning provides both the benefit of encrypted and compressed data and the benefit of dynamically partitioning data based on record fields.

Limits and quotas

Kinesis Data Firehose Dynamic Partitioning has a limit of 500 active partitions per delivery stream while it is actively buffering data—in other words, how many active partitions exist in the delivery stream during the configured buffering hints. This limit is adjustable, and if you want to increase it, you’ll need to submit a support ticket for a limit increase.

Each new value that is determined by the JSONPath select query will result in a new partition in the Kinesis Data Firehose delivery stream. The partition has an associated buffer of data that will be delivered to Amazon S3 in the evaluated partition prefix. Upon delivery to Amazon S3, the buffer that previously held that data and the associated partition will be deleted and deducted from the active partitions count in Kinesis Data Firehose.

Consider the following records that were ingested to my delivery stream.

{"customer_id": "123"}
{"customer_id": "124"}
{"customer_id": "125"}

If I decide to use customer_id for my dynamic data partitioning and deliver records to different prefixes, I’ll have three active partitions if the records keep ingesting for all of my customers. When there are no more records for "customer_id": "123", Kinesis Data Firehose will delete the buffer and will keep only two active partitions.

If you exceed the maximum number of active partitions, the rest of the records in the delivery stream will be delivered to the S3 error prefix. For more information, see the Error Handling section of this blog post.

A maximum throughput of 25 MB per second is supported for each active partition. This limit is not adjustable. You can monitor the throughput with the new metric called PerPartitionThroughput.

Best practices

The right partitioning can help you to save costs related to the amount of data that is scanned by analytics services like Amazon Athena. On the other hand, over-partitioning may lead to the creation of smaller objects and wipe out the initial benefit of cost and performance. See Top 10 Performance Tuning Tips for Amazon Athena.

We advise you to align your partitioning keys with your analysis queries downstream to promote compatibility between the two systems. At the same time, take into consideration how high cardinality can impact the dynamic partitioning active partition limit.

When you decide which fields to use for the dynamic data partitioning, it’s a fine balance between picking fields that match your business case and taking into consideration the partition count limits. You can monitor the number of active partitions with the new metric PartitionCount, as well as the number of partitions that have exceeded the limit with the metric PartitionCountExceeded.

Another way to optimize cost is to aggregate your events into a single PutRecord and PutRecordBatch API call. Because Kinesis Data Firehose is billed per GB of data ingested, which is calculated as the number of data records you send to the service, times the size of each record rounded up to the nearest 5 KB, you can put more data per each ingestion call.

Data partition functionality is run after data is de-aggregated, so each event will be sent to the corresponding Amazon S3 prefix based on the partitionKey field within each event.

Error handling

Imagine that the following record enters your Kinesis Data Firehose delivery stream.

{“customerID”: 1000}

When your dynamic partition query scans over this record, it will be unable to locate the specified key of customer_id, and therefore will result in an error. In this scenario, we suggest using S3 error prefix when you create or modify your Kinesis Data Firehose stream.

All failed records will be delivered to the error prefix. The records you might find there are the events without the field you specified as your partition key.

Cost and pricing examples

Kinesis Data Firehose Dynamic Partitioning is billed per GB of partitioned data delivered to S3, per object, and optionally per jq processing hour for data parsing. The cost can vary based on the AWS Region where you decide to create your stream.

For more information, see our pricing page.

Summary

In this post, we discussed the Kinesis Data Firehose Dynamic Partitioning feature, and explored the use cases where this feature can help improve pipeline performance. We also covered how to develop and optimize a Kinesis Data Firehose pipeline by using dynamic partitioning and the best practices around building a reliable delivery stream.

Kinesis Data Firehose dynamic partitioning will be available in all Regions at launch, and we urge you to try the new feature to see how it can simplify your delivery stream and query engine performance. Be sure to provide us with any feedback you have about the new feature.


About the Authors

Jeremy Ber has been working in the telemetry data space for the past 5 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. In the past, Jeremy has supported and built systems that stream in terabytes of data per day and process complex machine learning algorithms in real time. At AWS, he is a Solutions Architect Streaming Specialist, supporting both Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Kinesis.

Michael Greenshtein’s career started in software development and shifted to DevOps. Michael worked with AWS services to build complex data projects involving real-time, ETLs, and batch processing. Now he works in AWS as Solutions Architect in the Europe, Middle East, and Africa (EMEA) region, supporting a variety of customer use cases.