Tag Archives: Big Data

Let’s Architect! Modern data architectures

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-modern-data-architectures/

With the rapid growth in data coming from data platforms and applications, and the continuous improvements in state-of-the-art machine learning algorithms, data are becoming key assets for companies.

Modern data architectures include data mesh—a recent style that represents a paradigm shift, in which data is treated as a product and data architectures are designed around business domains. This type of approach supports the idea of distributed data, where each business domain focuses on the quality of the data it produces and exposes to the consumers.

In this edition of Let’s Architect!, we focus on data mesh and how it is designed on AWS, plus other approaches to adopt modern architectural patterns.

Design a data mesh architecture using AWS Lake Formation and AWS Glue

Domain Driven Design (DDD) is a software design approach where a solution is divided into domains aligned with business capabilities, software, and organizational boundaries. Unlike software architectures, most data architectures are often designed around technologies rather than business domains.

In this blog, you can learn about data mesh, an architectural pattern that applies the principles of DDD to data architectures. Data are organized into domains and considered the product that each team owns and offers for consumption.

A data mesh design organizes around data domains. Each domain owns multiple data products with their own data and technology stacks

A data mesh design organizes around data domains. Each domain owns multiple data products with their own data and technology stacks

Building Data Mesh Architectures on AWS

In this video, discover how to use the data mesh approach in AWS. Specifically, how to implement certain design patterns for building a data mesh architecture with AWS services in the cloud.

This is a pragmatic presentation to get a quick understanding of data mesh fundamentals, the benefits/challenges, and the AWS services that you can use to build it. This video provides additional context to the aforementioned blog post and includes several examples on the benefits of modern data architectures.

This diagram demonstrates the pattern for sharing data catalogs between producer domains and consumer domains

This diagram demonstrates the pattern for sharing data catalogs between producer domains and consumer domains

Build a modern data architecture on AWS with Amazon AppFlow, AWS Lake Formation, and Amazon Redshift

In this blog, you can learn how to build a modern data strategy using AWS managed services to ingest data from sources like Salesforce. Also discussed is how to automatically create metadata catalogs and share data seamlessly between the data lake and data warehouse, plus creating alerts in the event of an orchestrated data workflow failure.

The second part of the post explains how a data warehouse can be built by using an agile data modeling pattern, as well as how ELT jobs were quickly developed, orchestrated, and configured to perform automated data quality testing.

A data platform architecture and the subcomponents used to build it

A data platform architecture and the subcomponents used to build it

AWS Lake Formation Workshop

With a modern data architecture on AWS, architects and engineers can rapidly build scalable data lakes; use a broad and deep collection of purpose-built data services; and ensure compliance via unified data access, security, and governance. As data mesh is a modern architectural pattern, you can build it using a service like AWS Lake Formation.

Familiarize yourself with new technologies and services by not only learning how they work, but also to building prototypes and projects to gain hands-on experience. This workshop allows builders to become familiar with the features of AWS Lake Formation and its integrations with other AWS services.

A data catalog is a key component in a data mesh architecture. AWS Glue crawlers interact with data stores and other elements to populate the data catalog

A data catalog is a key component in a data mesh architecture. AWS Glue crawlers interact with data stores and other elements to populate the data catalog

See you next time!

Thanks for joining our discussion on data mesh! See you in a couple of weeks when we talk more about architectures and the challenges that we face every day while working with distributed systems.

Other posts in this series

Looking for more architecture content?

AWS Architecture Center provides reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more!

Interactively develop your AWS Glue streaming ETL jobs using AWS Glue Studio notebooks

Post Syndicated from Arun A K original https://aws.amazon.com/blogs/big-data/interactively-develop-your-aws-glue-streaming-etl-jobs-using-aws-glue-studio-notebooks/

Enterprise customers are modernizing their data warehouses and data lakes to provide real-time insights, because having the right insights at the right time is crucial for good business outcomes. To enable near-real-time decision-making, data pipelines need to process real-time or near-real-time data. This data is sourced from IoT devices, change data capture (CDC) services like AWS Data Migration Service (AWS DMS), and streaming services such as Amazon Kinesis, Apache Kafka, and others. These data pipelines need to be robust, able to scale, and able to process large data volumes in near-real time. AWS Glue streaming extract, transform, and load (ETL) jobs process data from data streams, including Kinesis and Apache Kafka, apply complex transformations in-flight, and load it into a target data stores for analytics and machine learning (ML).

Hundreds of customers are using AWS Glue streaming ETL for their near-real-time data processing requirements. These customers required an interactive capability to process streaming jobs. Previously, when developing and running a streaming job, you had to wait for the results to be available in the job logs or persisted into a target data warehouse or data lake to be able to view the results. With this approach, debugging and adjusting code is difficult, resulting in a longer development timeline.

Today, we are launching a new AWS Glue streaming ETL feature to interactively develop streaming ETL jobs in AWS Glue Studio notebooks and interactive sessions.

In this post, we provide a use case and step-by-step instructions to develop and debug your AWS Glue streaming ETL job using a notebook.

Solution overview

To demonstrate the streaming interactive sessions capability, we develop, test, and deploy an AWS Glue streaming ETL job to process Apache Webserver logs. The following high-level diagram represents the flow of events in our job.
BDB-2464 High Level Application Architecture
Apache Webserver logs are streamed to Amazon Kinesis Data Streams. An AWS Glue streaming ETL job consumes the data in near-real time and runs an aggregation that computes how many times a webpage has been unavailable (status code 500 and above) due to an internal error. The aggregate information is then published to a downstream Amazon DynamoDB table. As part of this post, we develop this job using AWS Glue Studio notebooks.

You can either work with the instructions provided in the notebook, which you download when instructed later in this post, or follow along with this post to author your first streaming interactive session job.

Prerequisites

To get started, click the Launch Stack button below, to run an AWS CloudFormation template on your AWS environment.

BDB-2063-launch-cloudformation-stack

The template provisions a Kinesis data stream, DynamoDB table, AWS Glue job to generate simulated log data, and the necessary AWS Identity and Access Management (IAM) role and polices. After you deploy your resources, you can review the Resources tab on the AWS CloudFormation console for detailed information.

Set up the AWS Glue streaming interactive session job

To set up your AWS Glue streaming job, complete the following steps:

  1. Download the notebook file and save it to a local directory on your computer.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Choose Create job.
  4. Select Jupyter Notebook.
  5. Under Options, select Upload and edit an existing notebook.
  6. Choose Choose file and browse to the notebook file you downloaded.
  7. Choose Create.
BDB-2464 Create Job
  1. For Job name¸ enter a name for the job.
  2. For IAM Role, use the role glue-iss-role-0v8glq, which is provisioned as part of the CloudFormation template.
  3. Choose Start notebook job.
BDB-2464 Start Notebook

You can see that the notebook is loaded into the UI. There are markdown cells with instructions as well as code blocks that you can run sequentially. You can either run the instructions on the notebook or follow along with this post to continue with the job development.

BDB-2464 Explore Notebook

Run notebook cells

Let’s run the code block that has the magics. The notebook has notes on what each magic does.

  1. Run the first cell.
BDB-2464 Run First Cell

After running the cell, you can see in the output section that the defaults have been reconfigured.

BDB-2464 Configurations Set

In the context of streaming interactive sessions, an important configuration is job type, which is set to streaming. Additionally, to minimize costs, the number of workers is set to 2 (default 5), which is sufficient for our use case that deals with a low-volume simulated dataset.

Our next step is to initialize an AWS Glue streaming session.

  1. Run the next code cell.
BDB-2464 Initiate Session

After we run this cell, we can see that a session has been initialized and a session ID is created.

A Kinesis data stream and AWS Glue data generator job that feeds into this stream have already been provisioned and triggered by the CloudFormation template. With the next cell, we consume this data as an Apache Spark DataFrame.

  1. Run the next cell.
BDB-2464 Fetch From Kinesis

Because there are no print statements, the cells don’t show any output. You can proceed to run the following cells.

Explore the data stream

To help enhance the interactive experience in AWS Glue interactive sessions, GlueContext provides the method getSampleStreamingDynamicFrame. It provides a snapshot of the stream in a static DynamicFrame. It takes three arguments:

  • The Spark streaming DataFrame
  • An options map
  • A writeStreamFunction to apply a function to every sampled record

Available options are as follows:

  • windowSize – Also known as the micro-batch duration, this parameter determines how long a streaming query will wait after the previous batch was triggered.
  • pollingTimeInMs – This is the total length of time the method will run. It starts at least one micro-batch to obtain sample records from the input stream. The time unit is milliseconds, and the value should be greater than the windowSize.
  • recordPollingLimit – This is defaulted to 100, and helps you set an upper bound on the number of records that is retrieved from the stream.

Run the next code cell and explore the output.

BDB-2464 Sample Data

We see that the sample consists of 100 records (the default record limit), and we have successfully displayed the first 10 records from the sample.

Work with the data

Now that we know what our data looks like, we can write the logic to clean and format it for our analytics.

Run the code cell containing the reformat function.

Note that Python UDFs aren’t the recommended way to handle data transformations in a Spark application. We use reformat() to exemplify troubleshooting. When working with a real-world production application, we recommend using native APIs wherever possible.

BDB-2464 Run The UDF

We see that the code cell failed to run. The failure was on purpose. We deliberately created a division by zero exception in our parser.

BDB-2464 Error Running The Code

Failure and recovery

In case of a regular AWS Glue job, for any error, the whole application exits, and you have to make code changes and resubmit the application. However, in case of interactive sessions, the coding context and definitions are fully preserved and the session is still operational. There is no need to bootstrap a new cluster and rerun all the preceding transformation. This allows you to focus on quickly iterating your batch function implementation to obtain the desired outcome. You can fix the defects and run them in a matter of seconds.

To test this out, go back to the code and comment or delete the erroneous line error_line=1/0 and rerun the cell.

BDB-2464 Error Corrected

Implement business logic

Now that we have successfully tested our parsing logic on the sample stream, let’s implement the actual business logic. The logics are implemented in the processBatch method within the next code cell. In this method, we do the following:

  • Pass the streaming DataFrame in micro-batches
  • Parse the input stream
  • Filter messages with status code >=500
  • Over a 1-minute interval, get the count of failures per webpage
  • Persist the preceding metric to a DynamoDB table (glue-iss-ddbtbl-0v8glq)
  1. Run the next code cell to trigger the stream processing.
BDB-2464 Trigger DDB Write
  1. Wait a few minutes for the cell to complete.
  2. On the DynamoDB console, navigate to the Items page and select the glue-iss-ddbtbl-0v8glq table.
BDB-2464 Explore DDB

The page displays the aggregated results that have been written by our interactive session job.

Deploy the streaming job

So far, we have been developing and testing our application using the streaming interactive sessions. Now that we’re confident of the job, let’s convert this into an AWS Glue job. We have seen that the majority of code cells are doing exploratory analysis and sampling, and aren’t required to be a part of the main job.

A commented code cell that represents the whole application is provided to you. You can uncomment the cell and delete all other cells. Another option would be to not use the commented cell, but delete just the two cells from the notebook that do the sampling or debugging and print statements.

To delete a cell, choose the cell and then choose the delete icon.

BDB-2464 Delete a Cell

Now that you have the final application code ready, save and deploy the AWS Glue job by choosing Save.

BDB-2464 Save Job

A banner message appears when the job is updated.

BDB-2464 Save Job Banner

Explore the AWS Glue job

After you save the notebook, you should be able to access the job like any regular AWS Glue job on the Jobs page of the AWS Glue console.

BDB-2464 Job Page

Additionally, you can look at the Job details tab to confirm the initial configurations, such as number of workers, have taken effect after deploying the job.

BDB-2464 Job Details Page

Run the AWS Glue job

If needed, you can choose Run to run the job as an AWS Glue streaming job.

BDB-2464 Job Run

To track progress, you can access the run details on the Runs tab.

BDB-2464 Job Run Details

Clean up

To avoid incurring additional charges to your account, stop the streaming job that you started as part of the instructions. Also, on the AWS CloudFormation console, select the stack that you provisioned and delete it.

Conclusion

In this post, we demonstrated how to do the following:

  • Author a job using notebooks
  • Preview incoming data streams
  • Code and fix issues without having to publish AWS Glue jobs
  • Review the end-to-end working code, remove any debugging, and print statements or cells from the notebook
  • Publish the code as an AWS Glue job

We did all of this via a notebook interface.

With these improvements in the overall development timelines of AWS Glue jobs, it’s easier to author jobs using the streaming interactive sessions. We encourage you to use the prescribed use case, CloudFormation stack, and notebook to jumpstart your individual use cases to adopt AWS Glue streaming workloads.

The goal of this post was to give you hands-on experience working with AWS Glue streaming and interactive sessions. When onboarding a productionized workload onto your AWS environment, based on the data sensitivity and security requirements, ensure you implement and enforce tighter security controls.


About the authors

Arun A K is a Big Data Solutions Architect with AWS. He works with customers to provide architectural guidance for running analytics solutions on the cloud. In his free time, Arun loves to enjoy quality time with his family.

Linan Zheng is a Software Development Engineer at AWS Glue Streaming Team, helping building the serverless data platform. His works involve large scale optimization engine for transactional data formats and streaming interactive sessions.

Roman Gavrilov is an Engineering Manager at AWS Glue. He has over a decade of experience building scalable Big Data and Event-Driven solutions. His team works on Glue Streaming ETL to allow near real time data preparation and enrichment for machine learning and analytics.

Shiv Narayanan is a Senior Technical Product Manager on the AWS Glue team. He works with AWS customers across the globe to strategize, build, develop, and deploy modern data platforms.

From centralized architecture to decentralized architecture: How data sharing fine-tunes Amazon Redshift workloads

Post Syndicated from Jingbin Ma original https://aws.amazon.com/blogs/big-data/from-centralized-architecture-to-decentralized-architecture-how-data-sharing-fine-tunes-amazon-redshift-workloads/

Amazon Redshift is a fully managed, petabyte-scale, massively parallel data warehouse that offers simple operations and high performance. It makes it fast, simple, and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Today, Amazon Redshift has become the most widely used cloud data warehouse.

With the significant growth of data for big data analytics over the years, some customers have asked how they should optimize Amazon Redshift workloads. In this post, we explore how to optimize workloads on Amazon Redshift clusters using Amazon Redshift RA3 nodes, data sharing, and pausing and resuming clusters. For more cost-optimization methods, refer to Getting the most out of your analytics stack with Amazon Redshift.

Key features of Amazon Redshift

First, let’s review some key features:

  • RA3 nodes – Amazon Redshift RA3 nodes are backed by a new managed storage model that gives you the power to separately optimize your compute power and your storage. They bring a few very important features, one of which is data sharing. RA3 nodes also support the ability to pause and resume, which allows you to easily suspend on-demand billing while the cluster is not being used.
  • Data sharing – Amazon Redshift data sharing offers you to extend the ease of use, performance, and cost benefits of Amazon Redshift in a single cluster to multi-cluster deployments while being able to share data. Data sharing enables instant, granular, and fast data access across Redshift clusters without the need to copy or move it. You can securely share live data with Amazon Redshift clusters in the same or different AWS accounts, and across regions. You can share data at many levels, including schemas, tables, views, and user-defined functions. You can also share the most up-to-date and consistent information as it’s updated in Amazon Redshift Serverless. It also provides fine-grained access controls that you can tailor for different users and businesses that all need access to the data. However, data sharing in Amazon Redshift has a few limitations.

Solution overview

In this use case, our customer is heavily using Amazon Redshift as their data warehouse for their analytics workloads, and they have been enjoying the possibility and convenience that Amazon Redshift brought to their business. They mainly use Amazon Redshift to store and process user behavioral data for BI purposes. The data has increased by hundreds of gigabytes daily in recent months, and employees from departments continuously run queries against the Amazon Redshift cluster on their BI platform during business hours.

The company runs four major analytics workloads on a single Amazon Redshift cluster, because some data is used by all workloads:

  • Queries from the BI platform – Various queries run mainly during business hours.
  • Hourly ETL – This extract, transform, and load (ETL) job runs in the first few minutes of each hour. It generally takes about 40 minutes.
  • Daily ETL – This job runs twice a day during business hours, because the operation team needs to get daily reports before the end of the day. Each job normally takes between 1.5–3 hours. It’s the second-most resource-heavy workload.
  • Weekly ETL – This job runs in the early morning every Sunday. It’s the most resource-heavy workload. The job normally takes 3–4 hours.

The analytics team has migrated to the RA3 family and increased the number of nodes of the Amazon Redshift cluster to 12 over the years to keep the average runtime of queries from their BI tool within an acceptable time due to the data size, especially when other workloads are running.

However, they have noticed that performance is reduced while running ETL tasks, and the duration of ETL tasks is long. Therefore, the analytics team wants to explore solutions to optimize their Amazon Redshift cluster.

Because CPU utilization spikes appear while the ETL tasks are running, the AWS team’s first thought was to separate workloads and relevant data into multiple Amazon Redshift clusters with different cluster sizes. By reducing the total number of nodes, we hoped to reduce the cost of Amazon Redshift.

After a series of conversations, the AWS team found that one of the reasons that the customer keeps all workloads on the 12-node Amazon Redshift cluster is to manage the performance of queries from their BI platform, especially while running ETL workloads, which have a big impact on the performance of all workloads on the Amazon Redshift cluster. The obstacle is that many tables in the data warehouse are required to be read and written by multiple workloads, and only the producer of a data share can update the shared data.

The challenge of dividing the Amazon Redshift cluster into multiple clusters is data consistency. Some tables need to be read by ETL workloads and written by BI workloads, and some tables are the opposite. Therefore, if we duplicate data into two Amazon Redshift clusters or only create a data share from the BI cluster to the reporting cluster, the customer will have to develop a data synchronization process to keep the data consistent between all Amazon Redshift clusters, and this process could be very complicated and unmaintainable.

After more analysis to gain an in-depth understanding of the customer’s workloads, the AWS team found that we could put tables into four groups, and proposed a multi-cluster, two-way data sharing solution. The purpose of the solution is to divide the workloads into separate Amazon Redshift clusters so that we can use Amazon Redshift to pause and resume clusters for periodic workloads to reduce the Amazon Redshift running costs, because clusters can still access a single copy of data that is required for workloads. The solution should meet the data consistency requirements without building a complicated data synchronization process.

The following diagram illustrates the old architecture (left) compared to the new multi-cluster solution (right).

Improve the old architecture (left) to the new multi-cluster solution (right)

Dividing workloads and data

Due to the characteristics of the four major workloads, we categorized workloads into two categories: long-running workloads and periodic-running workloads.

The long-running workloads are for the BI platform and hourly ETL jobs. Because the hourly ETL workload requires about 40 minutes to run, the gain is small even if we migrate it to an isolated Amazon Redshift cluster and pause and resume it every hour. Therefore, we leave it with the BI platform.

The periodic-running workloads are the daily and weekly ETL jobs. The daily job generally takes about 1 hour and 40 minutes to 3 hours, and the weekly job generally takes 3–4 hours.

Data sharing plan

The next step is identifying all data (tables) access patterns of each category. We identified four types of tables:

  • Type 1 – Tables are only read and written by long-running workloads
  • Type 2 – Tables are read and written by long-running workloads, and are also read by periodic-running workloads
  • Type 3 – Tables are read and written by periodic-running workloads, and are also read by long-running workloads
  • Type 4 – Tables are only read and written by periodic-running workloads

Fortunately, there is no table that is required to be written by all workloads. Therefore, we can separate the Amazon Redshift cluster into two Amazon Redshift clusters: one for the long-running workloads, and the other for periodic-running workloads with 20 RA3 nodes.

We created a two-way data share between the long-running cluster and the periodic-running cluster. For type 2 tables, we created a data share on the long-running cluster as the producer and the periodic-running cluster as the consumer. For type 3 tables, we created a data share on the periodic-running cluster as the producer and the long-running cluster as the consumer.

The following diagram illustrates this data sharing configuration.

The long-running cluster (producer) shares type 2 tables to the periodic-running cluster (consumer). The periodic-running cluster (producer’) shares type 3 tables to the long-running cluster (consumer’)

Build two-way data share across Amazon Redshift clusters

In this section, we walk through the steps to build a two-way data share across Amazon Redshift clusters. First, let’s take a snapshot of the original Amazon Redshift cluster, which became the long-running cluster later.

Take a snapshot of the long-running-cluster from the Amazon Redshift console

Now, let’s create a new Amazon Redshift cluster with 20 RA3 nodes for periodic-running workloads. Then we migrate the type 3 and type 4 tables to the periodic-running cluster. Make sure you choose the ra3 node type. (Amazon Redshift Serverless supports data sharing too, and it becomes generally available in July 2022, so it is also an option now.)

Create the periodic-running-cluster. Make sure you select the ra3 node type.

Create the long-to-periodic data share

The next step is to create the long-to-periodic data share. Complete the following steps:

  1. On the periodic-running cluster, get the namespace by running the following query:
SELECT current_namespace;

Make sure record the namespace.

  1. On the long-running cluster, we run queries similar to the following:
CREATE DATASHARE ltop_share SET PUBLICACCESSIBLE TRUE;
ALTER DATASHARE ltop_share ADD SCHEMA public_long;
ALTER DATASHARE ltop_share ADD ALL TABLES IN SCHEMA public_long;
GRANT USAGE ON DATASHARE ltop_share TO NAMESPACE '[periodic-running-cluster-namespace]';
  1. We can validate the long-to-periodic data share using the following command:
SHOW datashares;
  1. After we validate the data share, we get the long-running cluster namespace with the following query:
SELECT current-namespace;

Make sure record the namespace.

  1. On the periodic-running cluster, run the following command to load the data from the long-to-periodic data share with the long-running cluster namespace:
CREATE DATABASE ltop FROM DATASHARE ltop_share OF NAMESPACE '[long-running-cluster-namespace]';
  1. Confirm that we have read access to tables in the long-to-periodic data share.

Create the periodic-to-long data share

The next step is to create the periodic-to-long data share. We use the namespaces of the long-running cluster and the periodic-running cluster that we collected in the previous step.

  1. On the periodic-running cluster, run queries similar to the following to create the periodic-to-long data share:
CREATE DATASHARE ptol_share SET PUBLICACCESSIBLE TRUE;
ALTER DATASHARE ptol_share ADD SCHEMA public_periodic;
ALTER DATASHARE ptol_share ADD ALL TABLES IN SCHEMA public_periodic;
GRANT USAGE ON DATASHARE ptol_share TO NAMESPACE '[long-running-cluster-namespace]';
  1. Validate the data share using the following command:
SHOW datashares;
  1. On the long-running cluster, run the following command to load the data from the periodic-to-long data using the periodic-running cluster namespace:
CREATE DATABASE ptol FROM DATASHARE ptol_share OF NAMESPACE '[periodic-running-cluster-namespace]';
  1. Check that we have read access to the tables in the periodic-to-long data share.

At this stage, we have separated workloads into two Amazon Redshift clusters and built a two-way data share across two Amazon Redshift clusters.

The next step is updating the code of different workloads to use the correct endpoints of two Amazon Redshift clusters and perform consolidated tests.

Pause and resume the periodic-running Amazon Redshift cluster

Let’s update the crontab scripts, which run periodic-running workloads. We make two updates.

  1. When the scripts start, call the Amazon Redshift check and resume cluster APIs to resume the periodic-running Amazon Redshift cluster when the cluster is paused:
    aws redshift resume-cluster --cluster-identifier [periodic-running-cluster-id]

  2. After the workloads are finished, call the Amazon Redshift pause cluster API with the cluster ID to pause the cluster:
    aws redshift pause-cluster --cluster-identifier [periodic-running-cluster-id]

Results

After we migrated the workloads to the new architecture, the company’s analytics team ran some tests to verify the results.

According to tests, the performance of all workloads improved:

  • The BI workload is about 100% faster during the ETL workload running periods
  • The hourly ETL workload is about 50% faster
  • The daily workload duration reduced to approximately 40 minutes, from a maximum of 3 hours
  • The weekly workload duration reduced to approximately 1.5 hours, from a maximum of 4 hours

All functionalities work properly, and cost of the new architecture only increased approximately 13%, while over 10% of new data had been added during the testing period.

Learnings and limitations

After we separated the workloads into different Amazon Redshift clusters, we discovered a few things:

  • The performance of the BI workloads was 100% faster because there was no resource competition with daily and weekly ETL workloads anymore
  • The duration of ETL workloads on the periodic-running cluster was reduced significantly because there were more nodes and no resource competition from the BI and hourly ETL workloads
  • Even when over 10% new data was added, the overall cost of the Amazon Redshift clusters only increased by 13%, due to using the cluster pause and resume function of the Amazon Redshift RA3 family

As a result, we saw a 70% price-performance improvement of the Amazon Redshift cluster.

However, there are some limitations of the solution:

  • To use the Amazon Redshift pause and resume function, the code for calling the Amazon Redshift pause and resume APIs must be added to all scheduled scripts that run ETL workloads on the periodic-running cluster
  • Amazon Redshift clusters require several minutes to finish pausing and resuming, although you’re not charged during these processes
  • The size of Amazon Redshift clusters can’t automatically scale in and out depending on workloads

Next steps

After improving performance significantly, we can explore the possibility of reducing the number of nodes of the long-running cluster to reduce Amazon Redshift costs.

Another possible optimization is using Amazon Redshift Spectrum to reduce the cost of Amazon Redshift on cluster storage. With Redshift Spectrum, multiple Amazon Redshift clusters can concurrently query and retrieve the same structured and semistructured dataset in Amazon Simple Storage Service (Amazon S3) without the need to make copies of the data for each cluster or having to load the data into Amazon Redshift tables.

Amazon Redshift Serverless was announced for preview in AWS re:Invent 2021 and became generally available in July 2022. Redshift Serverless automatically provisions and intelligently scales your data warehouse capacity to deliver best-in-class performance for all your analytics. You only pay for the compute used for the duration of the workloads on a per-second basis. You can benefit from this simplicity without making any changes to your existing analytics and BI applications. You can also share data for read purposes across different Amazon Redshift Serverless instances within or across AWS accounts.

Therefore, we can explore the possibility of removing the need to script for pausing and resuming the periodic-running cluster by using Redshift Serverless to make the management easier. We can also explore the possibility of improving the granularity of workloads.

Conclusion

In this post, we discussed how to optimize workloads on Amazon Redshift clusters using RA3 nodes, data sharing, and pausing and resuming clusters. We also explored a use case implementing a multi-cluster two-way data share solution to improve workload performance with a minimum code change. If you have any questions or feedback, please leave them in the comments section.


About the authors

Jingbin Ma

Jingbin Ma is a Sr. Solutions Architect at Amazon Web Services. He helps customers build well-architected applications using AWS services. He has many years of experience working in the internet industry, and his last role was CTO of a New Zealand IT company before joining AWS. He is passionate about serverless and infrastructure as code.

Chao PanChao Pan is a Data Analytics Solutions Architect at Amazon Web Services. He’s responsible for the consultation and design of customers’ big data solution architectures. He has extensive experience in open-source big data. Outside of work, he enjoys hiking.

Bias in the machine: How can we address gender bias in AI?

Post Syndicated from Sue Sentance original https://www.raspberrypi.org/blog/gender-bias-in-ai-machine-learning-biased-data/

At the Raspberry Pi Foundation, we’ve been thinking about questions relating to artificial intelligence (AI) education and data science education for several months now, inviting experts to share their perspectives in a series of very well-attended seminars. At the same time, we’ve been running a programme of research trials to find out what interventions in school might successfully improve gender balance in computing. We’re learning a lot, and one primary lesson is that these topics are not discrete: there are relationships between them.

We can’t talk about AI education — or computer science education more generally — without considering the context in which we deliver it, and the societal issues surrounding computing, AI, and data. For this International Women’s Day, I’m writing about the intersection of AI and gender, particularly with respect to gender bias in machine learning.

The quest for gender equality

Gender inequality is everywhere, and researchers, activists, and initiatives, and governments themselves, have struggled since the 1960s to tackle it. As women and girls around the world continue to suffer from discrimination, the United Nations has pledged, in its Sustainable Development Goals, to achieve gender equality and to empower all women and girls.

While progress has been made, new developments in technology may be threatening to undo this. As Susan Leahy, a machine learning researcher from the Insight Centre for Data Analytics, puts it:

Artificial intelligence is increasingly influencing the opinions and behaviour of people in everyday life. However, the over-representation of men in the design of these technologies could quietly undo decades of advances in gender equality.

Susan Leavy, 2018 [1]

Gender-biased data

In her 2019 award-winning book Invisible Women: Exploring Data Bias in a World Designed for Men [2], Caroline Ceriado Perez discusses the effects of gender-biased data. She describes, for example, how the designs of cities, workplaces, smartphones, and even crash test dummies are all based on data gathered from men. She also discusses that medical research has historically been conducted by men, on male bodies.

Looking at this problem from a different angle, researcher Mayra Buvinic and her colleagues highlight that in most countries of the world, there are no sources of data that capture the differences between male and female participation in civil society organisations, or in local advisory or decision making bodies [3]. A lack of data about girls and women will surely impact decision making negatively. 

Bias in machine learning

Machine learning (ML) is a type of artificial intelligence technology that relies on vast datasets for training. ML is currently being use in various systems for automated decision making. Bias in datasets for training ML models can be caused in several ways. For example, datasets can be biased because they are incomplete or skewed (as is the case in datasets which lack data about women). Another example is that datasets can be biased because of the use of incorrect labels by people who annotate the data. Annotating data is necessary for supervised learning, where machine learning models are trained to categorise data into categories decided upon by people (e.g. pineapples and mangoes).

A banana, a glass flask, and a potted plant on a white surface. Each object is surrounded by a white rectangular frame with a label identifying the object.
Max Gruber / Better Images of AI / Banana / Plant / Flask / CC-BY 4.0

In order for a machine learning model to categorise new data appropriately, it needs to be trained with data that is gathered from everyone, and is, in the case of supervised learning, annotated without bias. Failing to do this creates a biased ML model. Bias has been demonstrated in different types of AI systems that have been released as products. For example:

Facial recognition: AI researcher Joy Buolamwini discovered that existing AI facial recognition systems do not identify dark-skinned and female faces accurately. Her discovery, and her work to push for the first-ever piece of legislation in the USA to govern against bias in the algorithms that impact our lives, is narrated in the 2020 documentary Coded Bias

Natural language processing: Imagine an AI system that is tasked with filling in the missing word in “Man is to king as woman is to X” comes up with “queen”. But what if the system completes “Man is to software developer as woman is to X” with “secretary” or some other word that reflects stereotypical views of gender and careers? AI models called word embeddings learn by identifying patterns in huge collections of texts. In addition to the structural patterns of the text language, word embeddings learn human biases expressed in the texts. You can read more about this issue in this Brookings Institute report

Not noticing

There is much debate about the level of bias in systems using artificial intelligence, and some AI researchers worry that this will cause distrust in machine learning systems. Thus, some scientists are keen to emphasise the breadth of their training data across the genders. However, other researchers point out that despite all good intentions, gender disparities are so entrenched in society that we literally are not aware of all of them. White and male dominance in our society may be so unconsciously prevalent that we don’t notice all its effects.

Three women discuss something while looking at a laptop screen.

As sociologist Pierre Bourdieu famously asserted in 1977: “What is essential goes without saying because it comes without saying: the tradition is silent, not least about itself as a tradition.” [4]. This view holds that people’s experiences are deeply, or completely, shaped by social conventions, even those conventions that are biased. That means we cannot be sure we have accounted for all disparities when collecting data.

What is being done in the AI sector to address bias?

Developers and researchers of AI systems have been trying to establish rules for how to avoid bias in AI models. An example rule set is given in an article in the Harvard Business Review, which describes the fact that speech recognition systems originally performed poorly for female speakers as opposed to male ones, because systems analysed and modelled speech for taller speakers with longer vocal cords and lower-pitched voices (typically men).

A women looks at a computer screen.

The article recommends four ways for people who work in machine learning to try to avoid gender bias:

  • Ensure diversity in the training data (in the example from the article, including as many female audio samples as male ones)
  • Ensure that a diverse group of people labels the training data
  • Measure the accuracy of a ML model separately for different demographic categories to check whether the model is biased against some demographic categories
  • Establish techniques to encourage ML models towards unbiased results

What can everybody else do?

The above points can help people in the AI industry, which is of course important — but what about the rest of us? It’s important to raise awareness of the issues around gender data bias and AI lest we find out too late that we are reintroducing gender inequalities we have fought so hard to remove. Awareness is a good start, and some other suggestions, drawn out from others’ work in this area are:

Improve the gender balance in the AI workforce

Having more women in AI and data science, particularly in both technical and leadership roles, will help to reduce gender bias. A 2020 report by the World Economic Forum (WEF) on gender parity found that women account for only 26% of data and AI positions in the workforce. The WEF suggests five ways in which the AI workforce gender balance could be addressed:

  1. Support STEM education
  2. Showcase female AI trailblazers
  3. Mentor women for leadership roles
  4. Create equal opportunities
  5. Ensure a gender-equal reward system

Ensure the collection of and access to high-quality and up-to-date gender data

We need high-quality dataset on women and girls, with good coverage, including country coverage. Data needs to be comparable across countries in terms of concepts, definitions, and measures. Data should have both complexity and granularity, so it can be cross-tabulated and disaggregated, following the recommendations from the Data2x project on mapping gender data gaps.

A woman works at a multi-screen computer setup on a desk.

Educate young people about AI

At the Raspberry Pi Foundation we believe that introducing some of the potential (positive and negative) impacts of AI systems to young people through their school education may help to build awareness and understanding at a young age. The jury is out on what exactly to teach in AI education, and how to teach it. But we think educating young people about new and future technologies can help them to see AI-related work opportunities as being open to all, and to develop critical and ethical thinking.

Three teenage girls at a laptop

In our AI education seminars we heard a number of perspectives on this topic, and you can revisit the videos, presentation slides, and blog posts. We’ve also been curating a list of resources that can help to further AI education — although there is a long way to go until we understand this area fully. 

We’d love to hear your thoughts on this topic.


References

[1] Leavy, S. (2018). Gender bias in artificial intelligence: The need for diversity and gender theory in machine learning. Proceedings of the 1st International Workshop on Gender Equality in Software Engineering, 14–16.

[2] Perez, C. C. (2019). Invisible Women: Exploring Data Bias in a World Designed for Men. Random House.

[3] Buvinic M., Levine R. (2016). Closing the gender data gap. Significance 13(2):34–37 

[4] Bourdieu, P. (1977). Outline of a Theory of Practice (No. 16). Cambridge University Press. (p.167)

The post Bias in the machine: How can we address gender bias in AI? appeared first on Raspberry Pi.

Auto-Diagnosis and Remediation in Netflix Data Platform

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/auto-diagnosis-and-remediation-in-netflix-data-platform-5bcc52d853d1

By Vikram Srivastava and Marcelo Mayworm

Netflix has one of the most complex data platforms in the cloud on which our data scientists and engineers run batch and streaming workloads. As our subscribers grow worldwide and Netflix enters the world of gaming, the number of batch workflows and real-time data pipelines increases rapidly. The data platform is built on top of several distributed systems, and due to the inherent nature of these systems, it is inevitable that these workloads run into failures periodically. Troubleshooting these problems is not a trivial task and requires collecting logs and metrics from several different systems and analyzing them to identify the root cause. At our scale, even a tiny percentage of disrupted workloads can generate a substantial operational support burden for the data platform team when troubleshooting involves manual steps. And we can’t discount the productivity impact it causes on data platform users.

It motivates us to be proactive in detecting and handling failed workloads in our production environment, avoiding interruptions that could slow down our teams. We have been working on an auto-diagnosis and remediation system called Pensive in the data platform to address these concerns. With the goal of troubleshooting failing and slow workloads and remediating them without human intervention wherever possible. As our platform continues to grow and different scenarios and issues can disrupt the workloads, Pensive has to be proactive in detecting broad problems at the platform level in real-time and diagnosing the impact across the workloads.

Pensive infrastructure comprises two separate systems to support batch and streaming workloads. This blog will explore these two systems and how they perform auto-diagnosis and remediation across our Big Data Platform and Real-time infrastructure.

Batch Pensive

Batch Pensive Architecture

Batch workflows in the data platform run using a Scheduler service that launches containers on the Netflix container management platform called Titus to run workflow steps. These steps launch jobs on clusters running Apache Spark and Presto via Genie. If a workflow step fails, Scheduler asks Pensive to diagnose the step’s error. Pensive collects logs for the failed jobs launched by the step from the relevant data platform components and then extracts the stack traces. Pensive relies on a regular expression based rules engine that has been curated over time. The rules encode information about whether an error is due to a platform issue or a user bug and whether the error is transient or not. If a regular expression from one of the rules matches, then Pensive returns information about that error to the Scheduler. If the error is transient, Scheduler will retry that step with exponential backoff a few more times.

The most critical part of Pensive is the set of rules used to classify an error. We need to evolve them as the platform evolves to ensure that the percentage of errors that Pensive cannot classify remains low. Initially, the rules were added on an ad-hoc basis as requests came in from platform component owners and users. We have now moved to a more systematic approach where unknown errors are fed into a Machine Learning process that performs clustering to propose new regular expressions for commonly occurring errors. We take the proposals to platform component owners to then come up with the classification of the error source and whether it is of transitory nature. In the future, we are looking to automate this process.

Detection of Platform-wide Issues

Pensive does error classification on individual workflow step failures, but by doing real-time analytics on the errors detected by Pensive using Apache Kafka and Apache Druid, we can quickly identify platform issues affecting many workflows. Once the individual diagnoses get stored in a Druid table, our monitoring and alerting system called Atlas does aggregations every minute and sends out alerts if there is a sudden increase in the number of failures due to platform errors. This has led to a dramatic reduction in the time it takes to detect issues in hardware or bugs in recently rolled out data platform software.

Streaming Pensive

Streaming Pensive Architecture
Streaming Pensive Architecture

Apache Flink powers real-time stream processing jobs in the Netflix data platform. And most of the Flink jobs run under a managed platform called Keystone, which abstracts out the underlying Flink job details and allows users to consume data from Apache Kafka streams and publish them to different data stores like Elasticsearch and Apache Iceberg on AWS S3.

Since the data platform manages keystone pipelines, users expect platform issues to be detected and remediated by the Keystone team without any involvement from their end. Furthermore, data in Kafka streams have a finite retention period, which adds time pressure for resolving the issues to avoid data loss.

For every Flink job running as part of a Keystone pipeline, we monitor the metric indicating how far the Flink consumer lags behind the Kafka producer. If it crosses a threshold, Atlas sends a notification to Streaming Pensive.

Like its batch counterpart, Streaming Pensive also has a rules engine to diagnose errors. However, in addition to logs, Streaming Pensive also has rules for checking various metric values for multiple components in the Keystone pipeline. The issue may occur in the source Kafka stream, the main Flink job, or the sinks to which the Flink job is writing data. Streaming Pensive diagnoses it and tries to remediate the issue automatically when it happens. Some examples where we are able to auto-remediate are:

  • If Streaming Pensive finds that one or more Flink Task Managers are going out of memory, it can redeploy the Flink cluster with more Task Managers.
  • If Streaming Pensive finds that there is an unexpected increase in the rate of incoming messages on the source Kafka cluster, it can increase the topic retention size and period so that we don’t lose any data while the consumer is lagging. If the spike goes away after some time, Streaming Pensive can revert the retention changes. Otherwise, it will page the job owner to investigate if there is a bug causing the increased rate or if the consumers need to be reconfigured to handle the higher rate.

Even though we have a high success rate, there are still occasions where automation is not possible. If manual intervention is required, Streaming Pensive will page the relevant component team to take timely action to resolve the issue.

What’s Next?

Pensive has had a significant impact on the operability of the Netflix data platform. And helped engineering teams lower the burden of operations work, freeing them to tackle more critical and challenging problems. But our job is nowhere near done. We have a long roadmap ahead of us. Some of the features and expansions we have planned are:

  • Batch Pensive is currently diagnosing failed jobs only, and we want to increase the scope into optimization to determine why jobs have become slow.
  • Auto-configure batch workflows so that they finish successfully or become faster and use fewer resources when possible. One example where it can dramatically help is Spark jobs, where memory tuning is a significant challenge.
  • Expand Pensive with Machine Learning classifiers.
  • The streaming platform recently added Data Mesh, and we need to expand Streaming Pensive to cover that.

Acknowledgments

This work could not have been completed without the help of the Big Data Compute and the Real-time Data Infrastructure teams within the Netflix data platform. They have been great partners for us as we work on improving the Pensive infrastructure.


Auto-Diagnosis and Remediation in Netflix Data Platform was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Engineers of Netflix — Interview with Pallavi Phadnis

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-engineers-of-netflix-interview-with-pallavi-phadnis-a1fcc5f64906

Data Engineers of Netflix — Interview with Pallavi Phadnis

This post is part of our “Data Engineers of Netflix” series, where our very own data engineers talk about their journeys to Data Engineering @ Netflix.

Pallavi Phadnis is a Senior Software Engineer at Netflix.

Pallavi Phadnis is a Senior Software Engineer on the Product Data Science and Engineering team. In this post, Pallavi talks about her journey to Netflix and the challenges that keep the work interesting.

Pallavi received her master’s degree from Carnegie Mellon. Before joining Netflix, she worked in the advertising and e-commerce industries as a backend software engineer. In her free time, she enjoys watching Netflix and traveling.

Her favorite shows: Stranger Things, Gilmore Girls, and Breaking Bad.

Pallavi, what’s your journey to data engineering at Netflix?

Netflix’s unique work culture and petabyte-scale data problems are what drew me to Netflix.

During earlier years of my career, I primarily worked as a backend software engineer, designing and building the backend systems that enable big data analytics. I developed many batch and real-time data pipelines using open source technologies for AOL Advertising and eBay. I also built online serving systems and microservices powering Walmart’s e-commerce.

Those years of experience solving technical problems for various businesses have taught me that data has the power to maximize the potential of any product.

Before I joined Netflix, I was always fascinated by my experience as a Netflix member which left a great impression of Netflix engineering teams on me. When I read Netflix’s culture memo for the first time, I was impressed by how candid, direct and transparent the work culture sounded. These cultural points resonated with me most: freedom and responsibility, high bar for performance, and no hiring of brilliant jerks.

Over the years, I followed the big data open-source community and Netflix tech blogs closely, and learned a lot about Netflix’s innovative engineering solutions and active contributions to the open-source ecosystem. In 2017, I attended the Women in Big Data conference at Netflix and met with several amazing women in data engineering, including our VP. At this conference, I also got to know my current team: Consolidated Logging (CL).

CL provides an end-to-end solution for logging, processing, and analyzing user interactions on Netflix apps from all devices. It is critical for fast-paced product innovation at Netflix since CL provides foundational data for personalization, A/B experimentation, and performance analytics. Moreover, its petabyte scale also brings unique engineering challenges. The scope of work, business impact, and engineering challenges of CL were very exciting to me. Plus, the roles on the CL team require a blend of data engineering, software engineering, and distributed systems skills, which align really well with my interests and expertise.

What is your favorite project?

The project I am most proud of is building the Consolidated Logging V2 platform which processes and enriches data at a massive scale (5 million+ events per sec at peak) in real-time. I re-architected our legacy data pipelines and built a new platform on top of Apache Flink and Iceberg. The scale brought some interesting learnings and involved working closely with the Apache Flink and Kafka community to fix core issues. Thanks to the migration to V2, we were able to improve data availability and usability for our consumers significantly. The efficiency achieved in processing and storage layers brought us big savings on computing and storage costs. You can learn more about it from my talk at the Flink forward conference. Over the last couple of years, we have been continuously enhancing the V2 platform to support more logging use cases beyond Netflix streaming apps and provide further analytics capabilities. For instance, I am recently working on a project to build a common analytics solution for 100s of Netflix studio and internal apps.

How’s data engineering similar and different from software engineering?

The data engineering role at Netflix is similar to the software engineering role in many aspects. Both roles involve designing and developing large-scale solutions using various open-source technologies. In addition to the business logic, they need a good understanding of the framework internals and infrastructure in order to ensure production stability, for example, maintaining SLA to minimize the impact on the upstream and downstream applications. At Netflix, it is fairly common for data engineers and software engineers to collaborate on the same projects.

In addition, data engineers are responsible for designing data logging specifications and optimized data models to ensure that the desired business questions can be answered. Therefore, they have to understand both the product and the business use cases of the data in depth.

In other words, data engineers bridge the gap between data producers (such as client UI teams) and data consumers (such as data analysts and data scientists.)

Learning more

Interested in learning more about data roles at Netflix? You’re in the right place! Keep an eye out for our open roles in Data Science and Engineering by visiting our jobs site here. Our culture is key to our impact and growth: read about it here. To learn more about our Data Engineers, check out our chats with Dhevi Rajendran, Samuel Setegne, and Kevin Wylie.


Data Engineers of Netflix — Interview with Pallavi Phadnis was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Engineers of Netflix — Interview with Kevin Wylie

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-engineers-of-netflix-interview-with-kevin-wylie-7fb9113a01ea

Data Engineers of Netflix — Interview with Kevin Wylie

This post is part of our “Data Engineers of Netflix” series, where our very own data engineers talk about their journeys to Data Engineering @ Netflix.

Kevin Wylie is a Data Engineer on the Content Data Science and Engineering team. In this post, Kevin talks about his extensive experience in content analytics at Netflix since joining more than 10 years ago.

Kevin grew up in the Washington, DC area, and received his undergraduate degree in Mathematics from Virginia Tech. Before joining Netflix, he worked at MySpace, helping implement page categorization, pathing analysis, sessionization, and more. In his free time he enjoys gardening and playing sports with his 4 kids.

His favorite TV shows: Ozark, Breaking Bad, Black Mirror, Barry, and Chernobyl

Since I joined Netflix back in 2011, my favorite project has been designing and building the first version of our entertainment knowledge graph. The knowledge graph enabled us to better understand the trends of movies, TV shows, talent, and books. Building the knowledge graph offered many interesting technical challenges such as entity resolution (e.g., are these two movie names in different languages really the same?), and distributed graph algorithms in Spark. After we launched the product, analysts and scientists began surfacing new insights that were previously hidden behind difficult-to-use data. The combination of overcoming technical hurdles and creating new opportunities for analysis was rewarding.

Kevin, what drew you to data engineering?

I stumbled into data engineering rather than making an intentional career move into the field. I started my career as an application developer with basic familiarity with SQL. I was later hired into my first purely data gig where I was able to deepen my knowledge of big data. After that, I joined MySpace back at its peak as a data engineer and got my first taste of data warehousing at internet-scale.

What keeps me engaged and enjoying data engineering is giving super-suits and adrenaline shots to analytics engineers and data scientists.

When I make something complex seem simple, or create a clean environment for my stakeholders to explore, research and test, I empower them to do more impactful business-facing work. I like that data engineering isn’t in the limelight, but instead can help create economies of scale for downstream analytics professionals.

What drew you to Netflix?

My wife came across the Netflix job posting in her effort to keep us in Los Angeles near her twin sister’s family. As a big data engineer, I found that there was an enormous amount of opportunity in the Bay Area, but opportunities were more limited in LA where we were based at the time. So the chance to work at Netflix was exciting because it allowed me to live closer to family, but also provided the kind of data scale that was most common for Bay Area companies.

The company was intriguing to begin with, but I knew nothing of the talent, culture, or leadership’s vision. I had been a happy subscriber of Netflix’s DVD-rental program (no late fees!) for years.

After interviewing, it became clear to me that this company culture was different than any I had experienced.

I was especially intrigued by the trust they put in each employee. Speaking with fellow employees allowed me to get a sense for the kinds of people Netflix hires. The interview panel’s humility, curiosity and business acumen was quite impressive and inspired me to join them.

I was also excited by the prospect of doing analytics on movies and TV shows, which was something I enjoyed exploring outside of work. It seemed fortuitous that the area of analytics that I’d be working in would align so well with my hobbies and interests!

Kevin, you’ve been at Netflix for over 10 years now, which is pretty incredible. Over the course of your time here, how has your role evolved?

When I joined Netflix back in 2011, our content analytics team was just 3 people. We had a small office in Los Angeles focused on content, and significantly more employees at the headquarters in Los Gatos. The company was primarily thought of as a tech company.

At the time, the data engineering team mainly used a data warehouse ETL tool called Ab Initio, and an MPP (Massively Parallel Processing) database for warehousing. Both were appliances located in our own data center. Hadoop was being lightly tested, but only in a few high-scale areas.

Fast forward 10 years, and Netflix is now the leading streaming entertainment service — serving members in over 190 countries. In the data engineering space, very little of the same technology remains. Our data centers are retired, Hadoop has been replaced by Spark, Ab Initio and our MPP database no longer fits our big data ecosystem.

In addition to the company and tech shifting, my role has evolved quite a bit as our company has grown. When we were a smaller company, the ability to span multiple functions was valued for agility and speed of delivery. The sooner we could ingest new data and create dashboards and reports for non-technical users to explore and analyze, the sooner we could deliver results. But now, we have a much more mature business, and many more analytics stakeholders that we serve.

For a few years, I was in a management role, leading a great team of people with diverse backgrounds and skill sets. However, I missed creating data products with my own hands so I wanted to step back into a hands-on engineering role. My boss was gracious enough to let me make this change and focus on impacting the business as an individual contributor.

As I think about my future at Netflix, what motivates me is largely the same as what I’ve always been passionate about. I want to make the lives of data consumers easier and to enable them to be more impactful. As the company scales and as we continue to invest in storytelling, the opportunity grows for me to influence these decisions through better access to information and insights. The biggest impact I can make as a data engineer is creating economies of scale by producing data products that will serve a diverse set of use cases and stakeholders.

If I can build beautifully simple data products for analytics engineers, data scientists, and analysts, we can all get better at Netflix’s goal: entertaining the world.

Learning more

Interested in learning more about data roles at Netflix? You’re in the right place! Keep an eye out for our open roles in Data Science and Engineering by visiting our jobs site here. Our culture is key to our impact and growth: read about it here. To learn more about our Data Engineers, check out our chats with Dhevi Rajendran and Samuel Setegne.


Data Engineers of Netflix — Interview with Kevin Wylie was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How Netflix uses eBPF flow logs at scale for network insight

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-netflix-uses-ebpf-flow-logs-at-scale-for-network-insight-e3ea997dca96

By Alok Tiagi, Hariharan Ananthakrishnan, Ivan Porto Carrero and Keerti Lakshminarayan

Netflix has developed a network observability sidecar called Flow Exporter that uses eBPF tracepoints to capture TCP flows at near real time. At much less than 1% of CPU and memory on the instance, this highly performant sidecar provides flow data at scale for network insight.

Challenges

The cloud network infrastructure that Netflix utilizes today consists of AWS services such as VPC, DirectConnect, VPC Peering, Transit Gateways, NAT Gateways, etc and Netflix owned devices. Netflix software infrastructure is a large distributed ecosystem that consists of specialized functional tiers that are operated on the AWS and Netflix owned services. While we strive to keep the ecosystem simple, the inherent nature of leveraging a variety of technologies will lead us to challenges such as:

  • App Dependencies and Data Flow Mappings: With the number of micro services growing by the day without understanding and having visibility into an application’s dependencies and data flows, it is difficult for both service owners and centralized teams to identify systemic issues.
  • Pathway Validation: Netflix velocity of change within the production streaming and studio environment can result in the inability of services to communicate with other resources.
  • Service Segmentation: The ease of the cloud deployments has led to the organic growth of multiple AWS accounts, deployment practices, interconnection practices, etc. Without having network visibility, it’s difficult to improve our reliability, security and capacity posture.
  • Network Availability: The expected continued growth of our ecosystem makes it difficult to understand our network bottlenecks and potential limits we may be reaching.

Cloud Network Insight is a suite of solutions that provides both operational and analytical insight into the cloud network infrastructure to address the identified problems. By collecting, accessing and analyzing network data from a variety of sources like VPC Flow Logs, ELB Access Logs, eBPF flow logs on the instances, etc, we can provide network insight to users and central teams through multiple data visualization techniques like Lumen, Atlas, etc.

Flow Exporter

The Flow Exporter is a sidecar that uses eBPF tracepoints to capture TCP flows at near real time on instances that power the Netflix microservices architecture.

What is BPF?

Berkeley Packet Filter (BPF) is an in-kernel execution engine that processes a virtual instruction set, and has been extended as eBPF for providing a safe way to extend kernel functionality. In some ways, eBPF does to the kernel what JavaScript does to websites: it allows all sorts of new applications to be created.

An eBPF flow log record represents one or more network flows that contain TCP/IP statistics that occur within a variable aggregation interval.

The sidecar has been implemented by leveraging the highly performant eBPF along with carefully chosen transport protocols to consume less than 1% of CPU and memory on any instance in our fleet. The choice of transport protocols like GRPC, HTTPS & UDP is runtime dependent on characteristics of the instance placement.

The runtime behavior of the Flow Exporter can be dynamically managed by configuration changes via Fast Properties. The Flow Exporter also publishes various operational metrics to Atlas. These metrics are visualized using Lumen, a self-service dashboarding infrastructure.

So how do we ingest and enrich these flows at scale ?

Flow Collector is a regional service that ingests and enriches flows. IP addresses within the cloud can move from one EC2 instance or Titus container to another over time. We use Sonar to attribute an IP address to a specific application at a particular time. Sonar is an IPv6 and IPv4 address identity tracking service.

Flow Collector consumes two data streams, the IP address change events from Sonar via Kafka and eBPF flow log data from the Flow Exporter sidecars. It performs real time attribution of flow data with application metadata from Sonar. The attributed flows are pushed to Keystone that routes them to the Hive and Druid datastores.

The attributed flow data drives various use cases within Netflix like network monitoring and network usage forecasting available via Lumen dashboards and machine learning based network segmentation. The data is also used by security and other partner teams for insight and incident analysis.

Summary

Providing network insight into the cloud network infrastructure using eBPF flow logs at scale is made possible with eBPF and a highly scalable and efficient flow collection pipeline. After several iterations of the architecture and some tuning, the solution has proven to be able to scale.

We are currently ingesting and enriching billions of eBPF flow logs per hour and providing visibility into our cloud ecosystem. The enriched data allows us to analyze networks across a variety of dimensions (e.g. availability, performance, and security), to ensure applications can effectively deliver their data payload across a globally dispersed cloud-based ecosystem.

Special Thanks To

Brendan Gregg


How Netflix uses eBPF flow logs at scale for network insight was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Engineers of Netflix — Interview with Dhevi Rajendran

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-engineers-of-netflix-interview-with-dhevi-rajendran-a9ab7c7b36e5

Data Engineers of Netflix — Interview with Dhevi Rajendran

Dhevi Rajendran

This post is part of our “Data Engineers of Netflix” interview series, where our very own data engineers talk about their journeys to Data Engineering @ Netflix.

Dhevi Rajendran is a Data Engineer on the Growth Data Science and Engineering team. Dhevi joined Netflix in July 2020 and is one of many Data Engineers who have onboarded remotely during the pandemic. In this post, Dhevi talks about her passion for data engineering and taking on a new role during the pandemic.

Before Netflix, Dhevi was a software engineer at Two Sigma, where she was most recently on a data engineering team responsible for bringing in datasets from a variety of different sources for research and trading purposes. In her free time, she enjoys drawing, doing puzzles, reading, writing, traveling, cooking, and learning new things.

Her favorite TV shows: Atlanta, Barry, Better Call Saul, Breaking Bad, Dark, Fargo, Succession, The Killing

Her favorite movies: Das Leben der Anderen, Good Will Hunting, Intouchables, Mother, Spirited Away, The Dark Knight, The Truman Show, Up

Dhevi, so what got you into data engineering?

While my background has mostly been in backend software engineering, I was most recently doing backend work in the data space prior to Netflix. One great thing about working with data is the impact you can create as an engineer.

At Netflix, the work that data engineers do to produce data in a robust, scalable way is incredibly important to provide the best experience to our members as they interact with our service.

Beyond the really interesting technical challenges that come with working with big data, there are lots of opportunities to think about higher-level domain challenges as a data engineer. In college, I had done human-computer interaction research on subtitles for the Deaf and hard-of-hearing as well as computational genomics research on Alzheimer’s disease. I’ve always enjoyed learning about new areas and combining this knowledge with my technical skills to solve real-world problems.

What drew you to Netflix?

Netflix’s mission and its culture primarily drew me to Netflix. I liked the idea of being a part of a company that brings joy to so many members around the world with an incredibly powerful platform for their stories to be heard. The blend of creativity and a strong engineering culture at Netflix really appealed to me.

The culture was also something that piqued my interest. I was pretty skeptical of Netflix’s culture memo at first. Many companies have lofty ideals that don’t necessarily translate into the reality of the company culture, so I was surprised to see how consistently the culture memo aligns with the actual culture at the company. I’ve found the culture of freedom and responsibility empowering.

Rather than the typical top-down approach many companies use, Netflix trusts each person to make the right decisions for the company by using their deep knowledge of the problems they’re solving along with the context they gather from their leaders and stakeholders.

This means a lot less red tape, a lot less friction, and a lot more freedom for everyone at the company to do what’s best for the business. I also really appreciate the amount of visibility and input we get into broader strategic decisions that the company makes.

Finally, I was also really excited about joining the Growth Data Engineering team! My team is responsible for building data products relating to how we connect with our new members around the world, which is high-impact and has far-reaching global significance. I love that I get to help Netflix connect with new members around the world and help shape the first impression we make on them.

What is your favorite project or a project that you’re particularly proud of?

I have been primarily involved in the payments space. Not a project per se, but one of the things I’ve enjoyed being involved in is the cross-functional meetings with peers and stakeholders who are working in the payments space. These meetings include product managers, designers, consumer insights researchers, software engineers, data scientists, and people in a wide variety of other roles.

I love that I get to work cross-functionally with such a diverse group of people looking at the same set of problems from a variety of unique perspectives.

In addition to my day-to-day technical work, these meetings have provided me with the opportunity to be involved in the high-level product, design, and strategic discussions, which I value. Through these cross-functional efforts, I’ve also really gotten to learn and appreciate the nuances of payments. From using credit cards (which are fairly common in the US but not as widely adopted outside the US) to physically paying in person, members in different countries prefer to pay for our subscription in a wide variety of ways. It’s incredible to see the thoughtful and deeply member-driven approach we use to think about something as seemingly routine, straightforward, and often taken for granted as payments.

What was it like taking on a new role during the pandemic?

First off, I feel very lucky to have found a new role in this very difficult period. With the amount of change and uncertainty, the past year brought, it somehow felt both fitting and imprudent to voluntarily add a career change to the mix. The prospect was daunting at first. I knew there would be a bunch for me to learn coming into Netflix, considering that I hadn’t worked with the technologies my team uses (primarily Scala and Spark). Looking back now, I’m incredibly grateful for the opportunity and glad that I took it. I’ve already learned so much in the past six months and am excited about how much more I can learn and the impact I can make going forward.

Onboarding remotely has been a unique experience as well. Building relationships and gathering broader context are more difficult right now. I’ve found that I’ve learned to be more proactive and actively seek out opportunities to get to know people and the business, whether through setting up coffee chats, reading memos, or attending meetings covering topics I want to learn more about. I still haven’t met anyone I work with in person, but my teammates, my manager, and people across the company have been really helpful throughout the onboarding process.

It’s been incredible to see how gracious people are with their time and knowledge. The amount of empathy and understanding people have shown to each other, including to those who are new to the company, has made taking the leap and joining Netflix a positive experience.

Learning more

Interested in learning more about data roles at Netflix? You’re in the right place! Keep an eye out for our open roles in Data Science and Engineering here. Our culture is key to our impact and growth: read about it here.


Data Engineers of Netflix — Interview with Dhevi Rajendran was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Engineers of Netflix — Interview with Samuel Setegne

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-engineers-of-netflix-interview-with-samuel-setegne-f3027f58c2e2

Data Engineers of Netflix — Interview with Samuel Setegne

Samuel Setegne

This post is part of our “Data Engineers of Netflix” interview series, where our very own data engineers talk about their journeys to Data Engineering @ Netflix.

Samuel Setegne is a Senior Software Engineer on the Core Data Science and Engineering team. Samuel and his team build tools and frameworks that support data engineering teams across Netflix. In this post, Samuel talks about his journey from being a clinical researcher to supporting data engineering teams.

Samuel comes from West Philadelphia, and he received his Master’s in Biotechnology from Temple University. Before Netflix, Samuel worked at Travelers Insurance in the Data Science & Engineering space, implementing real-time machine learning models to predict severity and complexity at the onset of property claims.

His favorite TV shows: Bojack Horseman, Marco Polo, and The Witcher

His favorite movies: Scarface, I Am Legend and The Old Guard

Sam, what drew you to data engineering?

Early in my career, I was headed full speed towards life as a clinical researcher. Many healthcare practitioners had strong hunches and wild theories that were exciting to test against an empirical study. I personally loved looking at raw data and using it to understand patterns in the world through technology. However, most challenges that came with my role were domain-related but not as technically demanding. For example — clinical data was often small enough to fit into memory on an average computer and only in rare cases would its computation require any technical ingenuity or massive computing power. There was not enough scope to explore the distributed and large-scale computing challenges that usually come with big data processing. Furthermore, engineering velocity was often sacrificed owing to rigid processes.

Moving into pure Data engineering not only offered me the technical challenges I’ve always craved for but also the opportunity to connect the dots through data which was the best of both worlds.

What is your favorite project or a project you’re particularly proud of?

The very first project I had the opportunity to work on as a Netflix contractor was migrating all of Data Science and Engineering’s Python 2 code to Python 3. This was without a doubt, my favorite project that also opened the door for me to join the organization as a full-time employee. It was thrilling to analyze code from various cross-functional teams and learn different coding patterns and styles.

This kind of exposure opened up opportunities for me to engage with various data engineering teams and advocate for python best practices that helped me drive greater impact at Netflix.

What drew you to Netflix?

What initially caught my attention about a chance to work at Netflix was the variety and quality of content. My family and friends were always ecstatic about having lively and raucous conversations about Netflix shows or movies they recently watched like Marco Polo and Tiger King.

Although other great companies play a role in our daily lives, many of them serve as a kind of utility, whereas Netflix is meant to make us live, laugh, and love by enabling us to experience new voices, cultures, and perspectives.

After I read Netflix’s culture memo, I was completely sold. It precisely described what I always knew was missing in places I’ve worked before. I found the mantra of “people over process” extremely refreshing and eventually learned that it unlocked a bold and creative part of me in my technical designs. For instance, if I feel that a design of an application or a pipeline would benefit from new technology or architecture, I have the freedom to explore and innovate without excessive red tape. Typically in large corporations, you’re tied to strict and redundant processes, causing a lot of fatigue for engineers. When I landed at Netflix, it was a breath of fresh air to learn that we lean into freedom and responsibility and allow engineers to push the boundaries.

Sam, how do you approach building tools/frameworks that can be used across data engineering teams?

My team provides generalized solutions for common and repetitive data engineering tasks. This helps provide “paved path” solutions for data engineering teams and reduces the burden of re-inventing the wheel. When you have many specialized teams composed of highly skilled engineers, the last thing you want for a data engineer is to spend too much time solving small problems that are usually buried inside of the big, broad, and impactful problems. When we extrapolate that to every engineer on every Data Science & Engineering team, it easily adds up and is something worth optimizing.

Any time you have a data engineer spending cycles working on tasks where the data engineering part of their brain is turned off, that’s an opportunity where better tooling can help.

For example, many data engineering teams have to orchestrate notification campaigns when they make changes to critical tables that have downstream dependencies. This is achievable by a Data Engineer but it can be very time-consuming, especially having to track the migration of these downstream users over to your new table or table schema to ensure it’s safe to finalize your changes. This problem was tackled by one of my highly skilled team members who built a centralized migration service that lets Data Engineers easily start “migration campaigns” that can automatically identify downstream users and provide notification and status-tracking capabilities by leveraging Jira. The aim is to enable Data Engineers to quickly fire up one of these campaigns and keep an eye out for its completion while using that extra time to focus on other tasks.

By investing in the right tooling to streamline redundant (yet necessary) tasks, we can drive higher data engineering productivity and efficiency, while accelerating innovation for Netflix.

Learning more

Interested in learning more about data roles at Netflix? You’re in the right place! Keep an eye out for our open roles in Data Science and Engineering by visiting our jobs site here. Our culture is key to our impact and growth: read about it here. Check out our chat with Dhevi Rajendran to know more about starting a new role as a Data Engineer during the pandemic here.


Data Engineers of Netflix — Interview with Samuel Setegne was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Managing complexity in Zabbix installations with Splunk

Post Syndicated from Christian Anton original https://blog.zabbix.com/managing-complexity-in-zabbix-installations-with-splunk/13053/

A big data analytics engine can be used to optimize large and complex Zabbix installations: keeping track of the amount and kind of problems over time, top alert producers, and much more. You can employ Splunk to optimize and analyze vital Zabbix runtime parameters, such as ‘unsupported items,’ repeatedly happening host availability issues, misconfigured agents, and Zabbix Queue entries.

Contents

I. Complexity (1:15)
II. Zabbix entity inventory (8:28)
III. Use cases (15:16)
IV. Conclusion (20:09)
V. Questions & Answers (21:41)

secadm GmbH is a service provider located in the south of Germany. The company with a strong background in monitoring and automation, network infrastructure, and security software development supports customers of all sizes to manage their IT infrastructures. secadm GmbH is a Zabbix partner and also a Zabbix training partner.

Complexity

Operating a Zabbix deployment of a specific size comes with some challenges:

  • A huge number of hosts, templates, items, host groups, macros, and configuration elements inside your Zabbix instance.
  • LLD rules/unsupported items — items that are unable to fetch information, for example, a wrong password or a wrong path, in an external check. It is often hard to get a hold of how many of those you have and in which of the various error states. Therefore, it’s also difficult to fix them.
  • Host availability/network issues — errors that you see only in the logs — things going up and down, losing their connectivity, but getting back in time before issuing an alert.
  • Queue entries. In larger Zabbix installations, you might have ten thousand or even more items in this service queue. Zabbix actually tells you that some items do not receive their data in time. Zabbix shows that something is really wrong, though it doesn’t give a hint about what is wrong.
  • Zabbix as a monitoring tool is there to actually generate problems and alerts out of these problems. Many problems often cause ‘alert fatigue’ when people start ignoring monitoring results because of too many alerts.

Therefore, we receive a lot of questions from our customers, such as:

  • Where do all these problems come from?
  • What are the hosts generating most of the problems, at what times, and generated by what templates?
  • Did the latest change/upgrade have any negative impact on our monitoring?
  • Can you get rid of unsupported items?
  • How many hosts have specific problems (for instance, caused by a known bug in an old version of an agent that behaves strangely with a specific version of the Zabbix server), and what would be the effect if we fixed those problems?
  • Where do all these queue entries come from?

Zabbix is a transparent and predictable monitoring tool that offers great ways to organize the monitored elements with templates and macros. Zabbix also offers excellent visualization capabilities. However, Zabbix is not an analytical utility offering a flexible query language to gather the required information in the required format, having on-demand statistical functions, and allowing you to enrich and correlate data with the data from arbitrary sources. So, extra tools will have to do the extra work.

Secadm GmbH being the partner of Zabbix and Splunk, has concluded that it’s obvious to use Splunk for such extra work. Splunk is offering many possibilities to onboard data in the platform far beyond the simple indexing of log data, looking up the Key-Value store, implementing scripts and programs inside the Splunk platform to fetch data in real-time and on-demand out of other systems without having to store and to index any kind of information, as well as performing custom search commands.

Zabbix entity inventory

The most important Zabbix data used for analysis — the inventory of all elements inside Zabbix that do not often change, such as:

  • Hosts,
  • Items,
  • Proxies,
  • Templates,
  • Triggers,
  • Discovery rules (LLD),
  • Item Prototypes, and
  • Trigger Prototypes.

As this data is not changing constantly, we fetch this data from Splunk with the scheduled search and custom search command directly from the API endpoints in Zabbix. Then we can store this information inside the Splunk KV Store, which is, in fact, the MongoDB allowing us to perform searches in milliseconds without having to index any data and quickly get the results.

Zabbix entity inventory

So, you can get statistics on status and state to drill down on the unsupported items for a list of all of the items. You can further identify the correlation for the hostnames instead of host IDs, which are not human-readable. The hostnames are available at the KV Store, which stores the hosts with their metadata. You can also identify how many unsupported items there are on each host.

You can also get information on the hostnames, hosts, item names, item types, and errors. You can categorize the problems as SNMP problems, shell problems such as wrong paths, and see how often certain problems happen and what hosts are assigned to what templates and host groups, and so on. This information may also be aggregated or correlated with information from UCMDB.

More data

More fun than having data within a data analytics platform has more data.

  • Indexing the Zabbix Server / Proxy Logs logs, categorizing events to identify availability issues, item problems, preprocessing problems, housekeepers statistics, etc.

  • A module to fetch information from Zabbix (item, host, trigger) in real-time.

  • Gathering metrics (History / Trends data) directly from Zabbix in real-time without the need to store these metrics in any place other than the Zabbix database. We can still use the data for graphing, correlations, calculations, etc.

  • Onboarding the Zabbix problems into Splunk by using the new custom Media types — Webhook.

Custom Media type

  • Correlation of the alert logs, which are new and available through the API since Zabbix 5.0.
  • Working on the queue items to solve these questions.

Use cases

Zabbix queue

Zabbix queue may be a real headache as you can wait for a Zabbix installation with 20,000-50,000 items for 5 or 10 minutes or even longer.

In this dashboard, the same view is displayed in Zabbix: items are categorized by overtime, item type, proxy, etc. Splunk here offers what Zabbix fails — the history so that you can see the spikes when things have changed dramatically. For instance, when more significant network changes happen, the network slows down, and the queues grow dramatically. You can see whether these queues have gone back down or remained up. This information is complicated to analyze in Zabbix.

You can also drill down to see the items correlated with their actual status and the host’s status inside Zabbix. So, you can clearly see, for instance, that an item is on the host that is down or in the queue as it’s not supported and doesn’t get any data.

Here, there is also an Ignore list. So you can get statistics for the remaining items and group them, for instance, by Item type. You can go further and analyze and fix the problems.

Zabbix problems analytics

Zabbix problems dashboard

In this dashboard, Zabbix problems are displayed by system categories. For instance, we can see that over the last 24 hours, Windows caused most of the problems.

Here, we can also drill down to see, for instance, if there are many similar problems. You can go further to identify a single issue that has caused many alerts or problems. You can see that one host is creating almost all of the problems. So, if you switch this one host off, you would have fewer problems.

Zabbix data for management visibility

We can use Zabbix data for greater management visibility, such as:

  • Correlation of data to generate meaningful dashboards:

— Zabbix (metrics, status, problems, etc.),
— application logs,
— other data sources,
— inventory (CMDB, …)

  • Business-level visualization.

Conclusion

Splunk is open-source software and is distributed for free. We are currently in the process of integrating Splunk with Zabbix.

If you are interested in Splunk, you can send a request to [email protected]  or look for Christian Anton on LinkedIn or Instagram.

Questions & Answers

Question. If we use this kind of integration, are there any performance issues caused by Splunk or some misconfiguration?

Answer. We have been using Splunk for installations with several tens of thousands of monitored hosts and from hundreds of thousands up to millions of items and have not seen any performance implications.

Question. How does this connector work under the hood? Does it use the API or direct queries to the database?

Answer. We rely on the API. Besides, we can fetch the data directly from the database.

 

Optimizing data warehouse storage

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/optimizing-data-warehouse-storage-7b94a48fdcbe

By Anupom Syam

Background

At Netflix, our current data warehouse contains hundreds of Petabytes of data stored in AWS S3, and each day we ingest and create additional Petabytes. At this scale, we can gain a significant amount of performance and cost benefits by optimizing the storage layout (records, objects, partitions) as the data lands into our warehouse.

There are several benefits of such optimizations like saving on storage, faster query time, cheaper downstream processing, and an increase in developer productivity by removing additional ETLs written only for query performance improvement. On the other hand, these optimizations themselves need to be sufficiently inexpensive to justify their own processing cost over the gains they bring.

We built AutoOptimize to efficiently and transparently optimize the data and metadata storage layout while maximizing their cost and performance benefits.

This article will list some of the use cases of AutoOptimize, discuss the design principles that help enhance efficiency, and present the high-level architecture. Then deep dive into the merging use case of AutoOptimize and share some results and benefits.

Use cases

We found several use cases where a system like AutoOptimize can bring tons of value. Some of the optimizations are prerequisites for a high-performance data warehouse. Sometimes Data Engineers write downstream ETLs on ingested data to optimize the data/metadata layouts to make other ETL processes cheaper and faster. The goal of AutoOptimize is to centralize such optimizations that will remove duplicate work and while doing it more efficiently than vanilla ETLs.

Merge

As the data lands into the data warehouse through real-time data ingestion systems, it comes in different sizes. This results in a perpetually increasing number of small files across the partitions. Merging those numerous smaller files into a handful of larger files can make query processing faster and reduce storage space.

Sort

Presorted records and files in partitions make queries faster and save significant amounts of storage space as it enables a higher level of compression. We already had some existing tables with sorting stages to reduce table storage and improve downstream query performance.

Compaction

Modern data warehouses allow updating and deleting pre-existing records. Iceberg plans to enable this in the form of delta files. Over time, the number of delta files grows, and compacting them to their source files can make the read operations more optimal.

Metadata optimization

In Iceberg, the physical partitioning is decoupled from logical partitioning by keeping a map to file locations in the metadata. This enables us to add additional indexes in the metadata to make point queries more optimal. We can also reorganize the metadata to make file scanning much faster.

Design Principles

For AutoOptimize to efficiently optimize the data layout, we’ve made the following choices:

  1. Just in time vs. periodic optimization
    Only optimize a given data set when required (based on what changed) instead of blind periodic runs.
  2. Essential vs. complete optimization
    Allow users to optimize at the point of diminishing returns instead of a binary setting. For example, we allow a partition to have a few small files instead of always merging files in perfect sizes.
  3. Minimum replacement vs. full overwrite
    Only replace the required minimum amount of files instead of a full sweep overwrite.

These principles reduce resource usage by being more efficient and effective while lowering the end-to-end latency in data processing.

Other than these principles, there are some other design considerations to support and enable:

  • Multi-tenancy with database and table prioritization.
  • Both automatic (event-driven) as well as manual (ad-hoc) optimization.
  • Transparency to end-users.

High-Level Design

AutoOptimize High-Level Design

AutoOptimize is split into 2 subsystems (Service and Actors) to decouple the decisions from the actions at a high level. This decoupling of responsibilities helps us to design, manage, use, and scale the subsystems independently.

AutoOptimize Service

The service is the decision-maker. It decides what to do and when to do in response to an incoming event. It is responsible for listening to incoming events and requests and prioritizing different tables and actions to make the best usage of the available resources.

The work done in the service can be further broken down into the following 3 steps:

Observe: Listen to changes in the warehouse in near real-time. Also, respond to ad-hoc requests created manually by end-users.

Orient: Gather tuning parameters for a particular table that changed. Also, adjust the resource allocation for the table or the number of actors depending on the backlog.

Decide: Determine the highest value action with the right parameters for this particular change and when to act depending on how the action falls in the global priority across all tables and actions.

In AutoOptimize, the service is a cluster of Java (Spring Boot) applications using Redis to keep the states.

AutoOptimize Actors

Actors in AutoOptimize are responsible for the actual work (merging/sorting/compaction etc.). The AutoOptimize Service sends commands to the actors that specify what to do. The job of Actors is to perform those commands in a distributed and fault-tolerant manner.

Actors in AutoOptimize are a pool of long-running Spark jobs managed by the AutoOptimize service.

This was not intentional but we found that the way we modularized AutoOptimize’s decision-making workflow is very similar to the OODA loop and decided to use the same taxonomy.

Other Components

Iceberg
We use Apache Iceberg as the table format. AutoOptimize relies on some of the Iceberg specific features such as snapshot and atomic operations to perform the optimizations in an accurate and scalable manner.

AutoAnalyze
In short, AutoAnalyze finds the best tuning/configuration parameters for a table. It uses “What-If” experiments and previous experiences and heuristics to find the most fitting attributes for a table. We will publish a follow-up blog post about AutoAnalyze in the future. For AutoOptimize, it may find if a table needs file merging or suggest a target file size and other parameters.

Deep Dive into File Merge

File merge is the first use-case that we built for AutoOptimize. Previously we had our homegrown system called Ursula responsible for data ingestion into the Hive based warehouse. The Ursula based pipeline also performed file merges on the ingested table partitions periodically. Since then, we have moved our ingestion to Keystone and our table layout to Iceberg.

The migration out of Ursula to Keystone/Iceberg based ingestion initiated the need for a replacement for Ursula file merge. File merging is necessary for a low latency streaming ingestion pipeline as data often arrive late and unevenly. The number of small files cripples across partitions over time and can have some serious side effects like:

  1. Slowing down queries.
  2. More processing resources.
  3. Increase in storage space.

The goal of File merge in AutoOptimize is to efficiently reduce the side effects while not adding additional latency to the data pipeline.

Solutions

This section will discuss some of the solutions that helped us achieve the previously stated goals.

Just in time optimization

AutoOptimize file merge gets triggered via table change events. This allows AutoOptimize to act right away with a minimum lag. But the problem with being event-driven is it’s expensive to scan the changed partitions every time they change. If we can determine “how noisy” a partition is from the changesets in a rolling manner, we will eliminate unnecessary full partition scanning with early signals from snapshots.

Essential work

After a full partition scan, AutoOptimize gets a more comprehensive view of the state of the partition. We can get a more accurate state of the partition at this stage and avoid non-essential work.

Partition Entropy
We introduced a concept called Partition Entropy (PE) used for early pruning at each step to reduce actual work. It’s a set of stats about the state of the partition. We calculate this in a rolling manner after each snapshot scan and more exhaustively after each partition scan.

The parts of PE that deal with file sizes are called File Size Entropy (FSE). FSE of a partition is derived from the Mean Squared Error (MSE) of file sizes in a partition. We will use the terms FSE and MSE interchangeably.

We use the standard Mean Squared Error formula:

Where,

N = Number of files in the partition
Target = Target File Size
Actual = min(Actual File Size, Target)

When a partition is scanned, it’s easy to calculate the MSE using the above formula as we know the sizes of all files in that partition. We store the MSE and N for each partition in Redis for later use.

At the snapshot scan stage, we get a commit definition containing the list of files and their metadata (like size, number of records, etc.) that got added and deleted in the commit. We calculate the new MSE’ of a changed partition in a rolling manner from the snapshot information and the previously stored stats using this formula:

Where,

M = Number of files added in the snapshot.
Target = Target File Size.
Actual = min(Actual File Size, Target)
N = Previously stored number of files in the partition.
MSE = Previously stored MSE.

We have a tolerance threshold (T) for each partition and skip further processing of the partition if MSE < T². This helps us significantly reduce the number of full partition scans at the snapshot scan step and the number of actual merges in the partition scan stage.

Entropy-Based Filtering

The actual formulas are a little bit more complicated than what stated here, as we need to take care of deleted files and some other edge cases. We could also use Mean Absolute Error but we want to be biased towards outliers — as the goal is to have a more even file size in a partition than having a mixed bag of different sizes with some perfect sized files.

Minimum replacement

Once we start processing a partition, we find the minimum amount of work needed to reduce the File Size Entropy and thus reduce the number of small files.

We use 2 different packing algorithms to achieve this:

Knuth/Plass line breaking algorithm
We use this strategy when the sort order among files is important. With a correct error function (ex: Error²), this algorithm helps minimize MSE with a bounding run time of O(n²).

First Fit Decreasing bin packing algorithm
We use a modified version of the original FFD algorithm if we can ignore the sort order. This helps reduce the number of replacements with an O(nlog(n)) running time.

These methods help us smooth out the file size histogram while doing it optimally with minimal file replacement.

Multi-tenancy

AutoOptimize is multi-tenant; that is, it runs on many different databases and tables. When running the optimizations, it also needs to prioritize and allocate resources at different levels for different tasks. It requires answering questions like which table should be processed first or get more resource bandwidth or what optimization gives the most ROI.

To support multi-tenancy and tasks prioritization, it needs to have the following properties:

  • Weighted resource sharing across different priorities.
  • Fair resource sharing across different tables and tasks with the same priority.
  • Handle bursts to prevent starvation.

We use different types of Weighted Fair Queue implementations inside AutoOptimize, including different combinations of the followings:

  1. Weighted Round Robin
  2. Deficit Weighted Round Robin
  3. Fixed Priority Preemptive

Reliable Priority Queue
To support prioritization and fair resource usage, we introduced a concept called Reliable Priority Queue (RPQ) in AutoOptimize. A reliable queue does not lose items if the subscriber fails to process the items after a dequeue. An RPQ also has a sense of prioritization across different items while being reliable. The concept is fairly similar to the default Redis RPOPLPUSH reliable queue pattern. But for AutoOptimize’s use case, we use Sorted Sets instead of lists to enable prioritization.

The goal of AutoOptimize is to optimize the warehouse with a holistic perspective. Making it multi-tenant with a notion of different priorities helps us make the most optimal resource allocation.

Results

22% reduction in partition scans

2% reduction in merge actions

72% reduction in file replacements

These savings are stacked on top of each other as they are applied in sequence in the AutoOptimize pipeline. This results in a massive reduction in actual processing need while reducing the number of files by 80%.

80% reduction in the number of files

70% saving in compute

We are using 70% less compute instances than our previous merge implementation.

We also see up to 60% improvement in query performance and an additional 1% saving in storage.

Benefits

Increase processing efficiency: As AutoOptimize uses file replacement and can avoid processing by filtering early, it can save processing costs by skipping files that are not required to be merged.

Increase storage efficiency: AutoOptimize helps save storage costs by enabling AutoAnalyze recommendations to sort the records.

Reduce lag: Periodic overwrite ETLs take more time as it works in batches. AutoOptimize reduces end to end lag in data processing by optimizing as we go.

Faster query: A smaller number of files results in smaller file scanning, fewer network calls, and makes queries faster.

Ease of use: AutoOptimize provides a frictionless way to setup optimization with minimum maintenance overhead from Data Engineering.

Developer productivity: Instead of adding an ETL per table for merging, which adds ongoing incremental maintenance cost, we have a single solution that can transparently scale to many tables.

Conclusion

We believe the problems we faced at Netflix are not unique, and some of the techniques and design considerations we made can be applied more generally. By laying out the data intelligently as they are ingested into the warehouse, we are removing complexities for Data Engineers and accelerating the end-to-end pipeline. At the same time, we are gaining a significant amount of performance and cost improvement by optimizing only when it makes sense. We plan to extend AutoOptimize into other use cases and integrate it more with the Iceberg ecosystem in the future.


Optimizing data warehouse storage was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Bulk vs Individual Compression

Post Syndicated from Bozho original https://techblog.bozho.net/bulk-vs-individual-compression/

I’d like to share something very brief and very obvious – that compression works better with large amounts of data. That is, if you have to compress 100 sentences you’d better compress them in bulk rather than once sentence at a time. Let me illustrate that:

public static void main(String[] args) throws Exception {
    List<String> sentences = new ArrayList<>();
    for (int i = 0; i < 100; i ++) {
        StringBuilder sentence = new StringBuilder();
        for (int j = 0; j < 100; j ++) { 
          sentence.append(RandomStringUtils.randomAlphabetic(10)).append(" "); 
        } 
        sentences.add(sentence.toString()); 
    } 
    byte[] compressed = compress(StringUtils.join(sentences, ". ")); 
    System.out.println(compressed.length); 
    System.out.println(sentences.stream().collect(Collectors.summingInt(sentence -> compress(sentence).length)));
}

The compress method is using commons-compress to easily generate results for multiple compression algorithms:

public static byte[] compress(String str) {
   if (str == null || str.length() == 0) {
       return new byte[0];
   }
   ByteArrayOutputStream out = new ByteArrayOutputStream();
   try (CompressorOutputStream gzip = new CompressorStreamFactory()
           .createCompressorOutputStream(CompressorStreamFactory.GZIP, out)) {
       gzip.write(str.getBytes("UTF-8"));
       gzip.close();
       return out.toByteArray();
   } catch (Exception ex) {
       throw new RuntimeException(ex);
   }
}

The results are as follows, in bytes (note that there’s some randomness, so algorithms are not directly comparable):

Algorithm Bulk Individual
GZIP 6590 10596
LZ4_FRAMED 9214 10900
BZIP2 6663 12451

Why is that an obvious result? Because of the way most compression algorithms work – they look for patterns in the raw data and create a map of those patterns (a very rough description).

How is that useful? In big data scenarios where the underlying store supports per-record compression (e.g. a database or search engine), you may save a significant amount of disk space if you bundle multiple records into one stored/indexed record.

This is not a generically useful advice, though. You should check the particular datastore implementation. For example MS SQL Server supports both row and page compression. Cassandra does compression on an SSTable level, so it may not matter how you structure your rows. Certainly, if storing data in files, storing it in one file and compressing it is more efficient than compressing multiple files separately.

Disk space is cheap so playing with data bundling and compression may be seen as premature optimization. But in systems that operate on large datasets it’s a decision that can save you a lot of storage costs.

The post Bulk vs Individual Compression appeared first on Bozho's tech blog.

Running a high-performance SAS Grid Manager cluster on AWS with Amazon FSx for Lustre

Post Syndicated from Neelam original https://aws.amazon.com/blogs/big-data/running-a-high-performance-sas-grid-manager-cluster-on-aws-with-amazon-fsx-for-lustre/

SAS® is a software provider of data science and analytics used by enterprises and government organizations. SAS Grid is a highly available, fast processing analytics platform that offers centralized management that balances workloads across different compute nodes. This application suite is capable of data management, visual analytics, governance and security, forecasting and text mining, statistical analysis, and environment management. SAS and AWS recently performed testing using the Amazon FSx for Lustre shared file system to determine how well a standard workload performs on AWS using SAS Grid Manager. For more information about the results, see the whitepaper Accelerating SAS Using High-Performing File Systems on Amazon Web Services.

In this post, we take a look at an approach to deploy underlying AWS infrastructure to run SAS Grid with FSx for Lustre that you can also apply to similar applications with demanding I/O requirements.

System design overview

Running high-performance workloads that use throughput heavily, with sensitivity to network latency, requires approaches outside of typical applications. AWS generally recommends that applications span multiple Availability Zones for high availability. In the case of latency sensitivity, high throughput applications traffic should be local for optimal performance. To maximize throughput, you can do the following:

  • Run in a virtual private cloud (VPC), using instance types that support enhanced networking
  • Run instances in the same Availability Zone
  • Run instances within a placement group

The following diagram illustrates the SAS Grid with FSx for Lustre architecture on AWS.

SAS Grid architecture consists of mid-tier nodes, metadata servers, and Grid compute nodes. The mid-tier nodes are responsible for running the Platform Web Services (PWS) and Load Sharing Facility (LSF) components. These components dispatch jobs submitted and return the status of each job.

To effectively run PWS and LSF on mid-tier nodes, you need Amazon Elastic Compute Cloud (Amazon EC2) instances with high memory. For this use case, the r5 instance family would meet this requirement.

Metadata servers contain the metadata repository that stores the metadata definitions of all SAS Grid manager products, which the r5 instance family can also serve effectively. We recommend either meeting or exceeding the recommended memory requirement of 24 GB of RAM or 8 MB per physical core (whichever is larger). Metadata servers don’t need compute-intensive resources or high I/O bandwidth; therefore, you can choose the r5 instance family for a balance of price and performance.

SAS Grid nodes are responsible for executing the jobs received by the grid, and EC2 instances capable of handling these jobs depend on the size, complexity, and volume of the work the grid performs. To meet the minimum requirements of SAS Grid workloads, we recommend having a minimum of 8 GB of physical RAM per core and a robust I/O throughput of 100–125 MB/second per physical core. For this use case, EC2 instance families of m5n and r5n suffice in meeting the RAM and throughput requirements. You can host SASDATA, SASWORK, and UTILLOC libraries in a shared file system. If you choose to offload SASWORK to instance storage, the i3en instance family meets this need because they support instance storage over 1.2 TB. In the next section, we take a look at how throughput testing was performed to arrive at the EC2 instance recommendations with FSx for Lustre.

Steps to maximize storage I/O performance

SAS Grid requires a shared file system, and we wanted to benchmark the performance of FSx for Lustre as the chosen shared file system against various EC2 instance families that meet the minimum requirements of 8 GB of physical RAM per core and 100–125 MB/second throughput per physical core.

FSx for Lustre is a fully managed file storage service designed for applications that require fast storage. As a POSIX-compliant file system, you can use FSx for Lustre with current Linux-based applications without having to make any changes. Although FSx for Lustre offers a choice between scratch and persistent type file systems, we recommend for SAS Grid to use persistent type FSx for Lustre file system because you need to store the SASWORK, SASDATA, and UTILLOC data and libraries for longer periods with high availability and data durability. To meet I/O throughput, make sure to select the appropriate storage capacity for throughput per unit of storage to achieve the desired range of 100–125 MB/second.

After setting up the file system, we recommend mounting FSx for Lustre with the flock mount option. The following code example is a mount command and mount option for FSx for Lustre:

$ sudo mount -t lustre -o noatime,flock fs-0123456789abcd.fsx.us-west- [email protected]:/za3atbmv /fsx
$ mount -t lustre
[email protected]:/za3atbmv on /fsx type lustre

(rw,noatime,seclabel,flock,lazystatfs)

Throughput testing and results

To select the best-placed EC2 instances for running SAS Grid with FSx for Lustre, we ran a series of highly parallel network throughput tests from individual EC2 instances against a 100.8 TiB persistent file system that had an aggregate throughput capacity of 19.688 GB/second. We ran these tests in multiple regions using multiple EC2 instance families (c5, c5n, i3, i3en, m5, m5a, m5ad, m5n, m5dn, r5, r5a, r5ad, r5n, and r5dn). The tests ran for 3 hours for each instance, and the DataWriteBytes metric of the file system was recorded every 1 minute. Only one instance was accessing the file system at a time, and the p99.9 results were captured. The metrics were consistent across all four Regions.

We observed that the i3en, m5n, m5dn, r5n, and r5dn EC2 instance families meet or exceed the minimum network performance and memory recommendations. For more information about the performance results, see the whitepaper Accelerating SAS Using High-Performing File Systems on Amazon Web Services. The i3 instance family is just shy of meeting the minimum network performance. If you want to use the instance storage for SASWORK and UTILLOC libraries, you can consider i3en instances.

M5n and r5n are a good blend of price and performance, and we recommend the m5n instance family for SAS Grid nodes. However, if your workload is memory bound, consider using r5n instances, which provide higher memory per physical core for a higher price point than m5n instances.

We also ran rhel_iotest.sh, which is available from the SAS technical support samples tool repository (SASTSST), using the same FSx for Lustre configuration as mentioned earlier. The following table shows the read and write performance per physical core for a variety of instances sizes in the m5n and r5n families.

Instance Type

Variable Network Performance Peak per Physical Core
Read (MB/second) Write (MB/second)
m5n.large 850.20 357.07
m5n.xlarge 519.46 386.25
m5n.2xlarge 283.01 446.84
m5n.4xlarge 202.89 376.57
m5n.8xlarge 154.98 297.71
r5n.large 906.88 429.93
r5n.xlarge 488.36 455.76
r5n.2xlarge 256.96 471.65
r5n.4xlarge 203.31 390.03
r5n.8xlarge 149.63 299.45

To take advantage of the elasticity, scalability, and flexibility of the cloud, we recommend spreading the SAS Grid and compute workload over a larger number of smaller instances versus using a smaller number of larger instances. For mid-tier, use a minimum of two instances, and for metadata servers, we recommend a minimum of three instances for the SAS Grid architecture.

Conclusions

Before FSx for Lustre file system, you either had to use Amazon Elastic File System (Amazon EFS) or a third-party file system from AWS Marketplace and Amazon Elastic Block Store (Amazon EBS) for the SASWORK, SASDATA, and UTILLOC libraries and storage data. Each storage option came with its own settings and limitations, which caused loss in performance. With FSx for Lustre, you have a single solution for all SAS Grid storage requirements, which allows you to focus on running your business instead of maintaining a file system. We recommend that SAS admin deploy SAS Grid with m5n and r5n instances for SAS Grid compute nodes when accessing FSx for Lustre file system.

If you have questions or suggestions, please leave a comment.

Hyper Scale VPC Flow Logs enrichment to provide Network Insight

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/hyper-scale-vpc-flow-logs-enrichment-to-provide-network-insight-e5f1db02910d

How Netflix is able to enrich VPC Flow Logs at Hyper Scale to provide Network Insight

By Hariharan Ananthakrishnan and Angela Ho

The Cloud Network Infrastructure that Netflix utilizes today is a large distributed ecosystem that consists of specialized functional tiers and services such as DirectConnect, VPC Peering, Transit Gateways, NAT Gateways, etc. While we strive to keep the ecosystem simple, the inherent nature of leveraging a variety of technologies will lead us to complications and challenges such as:

  • App Dependencies and Data Flow Mappings: Without understanding and having visibility into an application’s dependencies and data flows, it is difficult for both service owners and centralized teams to identify systemic issues.
  • Pathway Validation: Netflix velocity of change within the production streaming environment can result in the inability of services to communicate with other resources.
  • Service Segmentation: The ease of the cloud deployments has led to the organic growth of multiple AWS accounts, deployment practices, interconnection practices, etc. Without having network visibility, it’s not possible to improve our reliability, security and capacity posture.
  • Network Availability: The expected continued growth of our ecosystem makes it difficult to understand our network bottlenecks and potential limits we may be reaching.

Cloud Network Insight is a suite of solutions that provides both operational and analytical insight into the Cloud Network Infrastructure to address the identified problems. By collecting, accessing and analyzing network data from a variety of sources like VPC Flow Logs, ELB Access Logs, Custom Exporter Agents, etc, we can provide Network Insight to users through multiple data visualization techniques like Lumen, Atlas, etc.

VPC Flow Logs

VPC Flow Logs is an AWS feature that captures information about the IP traffic going to and from network interfaces in a VPC. At Netflix we publish the Flow Log data to Amazon S3. Flow Logs are enabled tactically on either a VPC or subnet or network interface. A flow log record represents a network flow in the VPC. By default, each record captures a network internet protocol (IP) traffic flow (characterized by a 5-tuple on a per network interface basis) that occurs within an aggregation interval.

version vpc-id subnet-id instance-id interface-id account-id type srcaddr dstaddr srcport dstport pkt-srcaddr pkt-dstaddr protocol bytes packets start end action tcp-flags log-status
3 vpc-12345678 subnet-012345678 i-07890123456 eni-23456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK

The IP addresses within the Cloud can move from one EC2 instance or Titus container to another over time. To understand the attributes of each IP back to an application metadata Netflix uses Sonar. Sonar is an IPv4 and IPv6 address identity tracking service. VPC Flow Logs are enriched using IP Metadata from Sonar as it is ingested.

With a large ecosystem at Netflix, we receive hundreds of thousands of VPC Flow Log files in S3 each hour. And in order to gain visibility into these logs, we need to somehow ingest and enrich this data.

So how do we ingest all these s3 files?

At Netflix, we have the option to use Spark as our distributed computing platform. It is easier to tune a large Spark job for a consistent volume of data. As you may know, S3 can emit messages when events (such as a file creation events) occur which can be directed into an AWS SQS queue. In addition to the s3 object path, these events also conveniently include file size which allows us to intelligently decide how many messages to grab from the SQS queue and when to stop. What we get is a group of messages representing a set of s3 files which we humorously call “Mouthfuls”. In other words, we are able to ensure that our Spark app does not “eat” more data than it was tuned to handle.

We named this library Sqooby. It works well for other pipelines that have thousands of files landing in s3 per day. But how does it hold up to the likes of Netflix VPC Flow Logs that has volumes which are orders of magnitude greater? It didn’t. The primary limitation was that AWS SQS queues have a limit of 120 thousand in-flight messages. We found ourselves needing to hold more than 120 thousand messages in flight at a time in order to keep up with the volumes of files.

Requirements

There are multiple ways you can solve this problem and many technologies to choose from. As with any sustainable engineering design, focusing on simplicity is very important. This means using existing infrastructure and established patterns within the Netflix ecosystem as much as possible and minimizing the introduction of new technologies.

Equally important is the resilience, recoverability, and supportability of the solution. A malformed file should not hold up or back up the pipeline (resilience). If unexpected environmental factors cause the pipeline to get backed up, it should be able to recover by itself. And excellent logging is needed for debugging purposes and supportability. These characteristics allow for an on-call response time that is relaxed and more in line with traditional big data analytical pipelines.

Hyper Scale

At Netflix, our culture gives us the freedom to decide how we solve problems as well as the responsibility of maintaining our solutions so that we may choose wisely. So how did we solve this scale problem that meets all of the above requirements? By applying existing established patterns in our ecosystem on top of Sqooby. In this case, it’s a pattern which generates events (directed into another AWS SQS queue) whenever data lands in a table in a datastore. These events represent a specific cut of data from the table.

We applied this pattern to the Sqooby log tables which contained information about s3 files for each Mouthful. What we got were events that represented Mouthfuls. Spark could look up and retrieve the data in the s3 files that the Mouthful represented. This intermediate step of persisting Mouthfuls allowed us to easily “eat” through S3 event SQS messages at great speed, converting them to far fewer Mouthful SQS Messages which would each be consumed by a single Spark app instance. Because we ensured that our ingestion pipeline could concurrently write/append to the final VPC Flow Log table, this meant that we could scale out the number of Spark app instances we spin up.

Tuning for Hyper Scale

On this journey of ingesting VPC flow logs, we found ourselves tweaking configurations in order to tune throughput of the pipeline. We modified the size of each Mouthful and tuned the number of Spark executors per Spark app while being mindful of cluster capacity. We also adjusted the frequency in which Spark app instances are spun up such that any backlog would burn off during a trough in traffic.

Summary

Providing Network Insight into the Cloud Network Infrastructure using VPC Flow Logs at hyper scale is made possible with the Sqooby architecture. After several iterations of this architecture and some tuning, Sqooby has proven to be able to scale.

We are currently ingesting and enriching hundreds of thousands of VPC Flow Logs S3 files per hour and providing visibility into our cloud ecosystem. The enriched data allows us to analyze networks across a variety of dimensions (e.g. availability, performance, and security), to ensure applications can effectively deliver their data payload across a globally dispersed cloud-based ecosystem.

Special Thanks To

Bryan Keller, Ryan Blue


Hyper Scale VPC Flow Logs enrichment to provide Network Insight was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

AWS Architecture Monthly Magazine: Data Lakes

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/aws-architecture-monthly-magazine-data-lakes/

A data lake is the fastest way to get answers from all your data to all your users. It’s a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning—to guide better decisions.

In This Issue

In this month’s Architecture Monthly, we speak to AWS Analytics Tech Leader, Taz Sayed, about general architecture trends in data lakes, the questions customers need to ask themselves before considering a data lake, and we get his outlook on the role the cloud will play in future development efforts.

We also introduce you to two companies that are utilizing data lakes for deep analytics, point you to an AWS managed solution, provide some real-world videos, and more.

  • Ask an Expert: Taz Sayed, Tech Leader, AWS Analytics
  • Blog: Kayo Sports builds real-time view of the customer on AWS
  • Case Study: Yulu Uses a Data Lake on AWS to Pedal a Change
  • Solution: Data Lake on AWS
  • Managed Solution: AWS Lake Formation
  • Whitepaper: Building Big Data Storage Solutions (Data Lakes) for Maximum Flexibility

How to Access the Magazine

We hope you’re enjoying Architecture Monthly, and we’d like to hear from you—leave us star rating and comment on the Amazon Kindle Newsstand page or contact us anytime at [email protected].

Delta: A Data Synchronization and Enrichment Platform

Post Syndicated from Netflix Technology Blog original https://medium.com/netflix-techblog/delta-a-data-synchronization-and-enrichment-platform-e82c36a79aee?source=rss----2615bd06b42e---4

Part I: Overview

Andreas Andreakis, Falguni Jhaveri, Ioannis Papapanagiotou, Mark Cho, Poorna Reddy, Tongliang Liu

Overview

It is a commonly observed pattern for applications to utilize multiple datastores where each is used to serve a specific need such as storing the canonical form of data (MySQL etc.), providing advanced search capabilities (ElasticSearch etc.), caching (Memcached etc.), and more. Typically when using multiple datastores, one of them acts as the primary store, and the others as derived stores. Now the challenge becomes how to keep these datastores in sync.

We have observed a series of distinct patterns which have tried to address multi-datastore synchronization, such as dual writes, distributed transactions, etc. However, these approaches have limitations in regards to feasibility, robustness, and maintenance. Beyond data synchronization, some applications also need to enrich their data by calling external services.

To address these challenges, we developed Delta. Delta is an eventual consistent, event driven, data synchronization and enrichment platform.

Existing Solutions

Dual Writes

In order to keep two datastores in sync, one could perform a dual write, which is executing a write to one datastore following a second write to the other. The first write can be retried, and the second can be aborted should the first fail after exhausting retries. However, the two datastores can get out of sync if the write to the second datastore fails. A common solution is to build a repair routine, which can periodically re-apply data from the first to the second store, or does so only if differences are detected.

Issues:
Implementing the repair routine typically is tailored work which may not be reusable. Also, data between the stores remain out of sync until the repair routine is applied. The solution can become increasingly complicated if more than two datastores are involved. Finally, the repair routine can add substantial stress to the primary data source during its activity.

Change Log Table

When mutations (like an insert, update and delete) occur on a set of tables, entries for the changes are added to the log table as part of the same transaction. Another thread or process is constantly polling events from the log table and writes them to one or multiple datastores, optionally removing events from the log table after acknowledged by all datastores.

Issues:
This needs to be implemented as a library and ideally without requiring code changes for the application using it. In a polyglot environment this library implementation needs to be repeated for each supported language and it is challenging to ensure consistent features and behavior across languages.

Another issue exists for the capture of schema changes, where some systems, like MySQL, don’t support transactional schema changes [1][2]. Therefore, the pattern to execute a change (like a schema change) and to transactionally write it to the change log table does not always work.

Distributed Transactions

Distributed transactions can be used to span a transaction across multiple heterogeneous datastores so that a write operation is either committed to all involved stores or to none.

Issues:
Distributed transactions have proven to be problematic across heterogeneous datastores. By their nature, they can only rely on the lowest common denominator of participating systems. For example, XA transactions block execution if the application process fails during the prepare phase; moreover, XA provides no deadlock detection and no support for optimistic concurrency-control schemes. Also, certain systems like ElasticSearch, do not support XA or any other heterogeneous transaction model. Thus, ensuring the atomicity of writes across different storage technologies remains a challenging problem for applications [3].

Delta

Delta has been developed to address the limitations of existing solutions for data synchronization, and also allows to enrich data on the fly. Our goal was to abstract those complexities from application developers so they can focus on implementing business features. In the following, we are describing “Movie Search”, an actual use case within Netflix that leverages Delta.

In Netflix the microservice architecture is widely adopted and each microservice typically handles only one type of data. The core movie data resides in a microservice called Movie Service, and related data such as movie deals, talents, vendors and so on are managed by multiple other microservices (e.g Deal Service, Talent Service and Vendor Service). Business users in Netflix Studios often need to search by various criteria for movies in order to keep track of productions, therefore, it is crucial for them to be able to search across all data that are related to movies.

Prior to Delta, the movie search team had to fetch data from multiple other microservices before indexing the movie data. Moreover, the team had to build a system that periodically updated their search index by querying others for changes, even if there was no change at all. That system quickly grew very complex and became difficult to maintain.

Figure 1. Polling System Prior to Delta

After on-boarding to Delta, the system is simplified into an event driven system, as depicted in the following diagram. CDC (Change-Data-Capture) events are sent by the Delta-Connector to a Keystone Kafka topic. A Delta application built using the Delta Stream Processing Framework consumes the CDC events from the topic, enriches each of them by calling other microservices, and finally sinks the enriched data to the search index in Elasticsearch. The whole process is nearly real-time, meaning as soon as the changes are committed to the datastore, the search indexes are updated.

Figure 2. Data Pipeline using Delta

In the following sections, we are going to describe the Delta-Connector that connects to a datastore and publishes CDC events to the Transport Layer, which is a real-time data transportation infrastructure routing CDC events to Kafka topics. And lastly we are going to describe the Delta Stream Processing Framework that application developers can use to build their data processing and enrichment logics.

CDC (Change-Data-Capture)

We have developed a CDC service named Delta-Connector, which is able to capture committed changes from a datastore in real-time and write them to a stream. Real-time changes are captured from the datastore’s transaction log and dumps. Dumps are taken because transaction logs typically do not contain the full history of changes. Changes are commonly serialized as Delta events so that a consumer does not need to be concerned if a change originates from the transaction log or a dump.

Delta-Connector offers multiple advanced features such as:

  • Ability to write into custom outputs beyond Kafka.
  • Ability to trigger manual dumps at any time, for all tables, a specific table, or for specific primary keys.
  • Dumps can be taken in chunks, so that there is no need to repeat from scratch in case of failure.
  • No need to acquire locks on tables, which is essential to ensure that the write traffic on the database is never blocked by our service.
  • High availability, via standby instances across AWS Availability Zones.

We currently support MySQL and Postgres, including when deployed in AWS RDS and its Aurora flavor. In addition, we support Cassandra (multi-master). We will cover the Delta-Connector in more detail in upcoming blog posts.

Kafka & Transport Layer

The transport layer of Delta events were built on top of the Messaging Service in our Keystone platform.

Historically, message publishing at Netflix is optimized for availability instead of durability (see a previous blog). The tradeoff is potential broker data inconsistencies in various edge scenarios. For example, unclean leader election will result in consumer to potentially duplicate or lose events.

For Delta, we want stronger durability guarantees in order to make sure CDC events can be guaranteed to arrive to derived stores. To enable this, we offered special purpose built Kafka cluster as a first class citizen. Some broker configuration looks like below.

In Keystone Kafka clusters, unclean leader election is usually enabled to favor producer availability. This can result in messages being lost when an out-of-sync replica is elected as a leader. For the new high durability Kafka cluster, unclean leader election is disabled to prevent these messages getting lost.

We’ve also increased the replication factor from 2 to 3 and the minimum insync replicas from 1 to 2. Producers writing to this cluster require acks from all, to guarantee that 2 out of 3 replicas have the latest messages that were written by the producers.

When a broker instance gets terminated, a new instance replaces the terminated broker. However, this new broker will need to catch up on out-of-sync replicas, which may take hours. To improve the recovery time for this scenario, we started using block storage volumes (Amazon Elastic Block Store) instead of local disks on the brokers. When a new instance replaces the terminated broker, it now attaches the EBS volume that the terminated instance had and starts catching up on new messages. This process reduces the catch up time from hours to minutes since the new instance no longer have to replicate from a blank state. In general, the separate life cycles of storage and broker greatly reduce the impact of broker replacement.

To further maximize our delivery guarantee, we used the message tracing system to detect any message loss due to extreme conditions (e.g clock drift on the partition leader).

Stream Processing Framework

The processing layer of Delta is built on top of Netflix SPaaS platform, which provides Apache Flink integration with the Netflix ecosystem. The platform provides a self-service UI which manages Flink job deployments and Flink cluster orchestration on top of our container management platform Titus. The self-service UI also manages job configurations and allows users to make dynamic configuration changes without having to recompile the Flink job.

Delta provides a stream processing framework on top of Flink and SPaaS that uses an annotation driven DSL (Domain Specific Language) to abstract technical details further away. For example, to define a step that enriches events by calling external services, users only need to write the following DSL and the framework will translate it into a model which is executed by Flink.

Figure 3. Enrichment DSL Example in a Delta Application

The processing framework not only reduces the learning curve, but also provides common stream processing functionalities like deduplication, schematization, as well as resilience and fault tolerance to address general operational concerns.

Delta Stream Processing Framework consists of two key modules, the DSL & API module and Runtime module. The DSL & API module provides the annotation based DSL and UDF (User-Defined-Function) APIs for users to write custom processing logic (e.g filter and transformation). The Runtime module provides DSL parser implementation that builds an internal representation of the processing steps in DAG models. The Execution component interprets the DAG models to initialize the actual Flink operators and eventually run the Flink app. The architecture of the framework is illustrated in the following Chart.

Figure 4. Delta Stream Processing Framework Architecture

This approach has several benefits:

  • Users can focus on their business logic without the need of learning the specifics of Flink or the SPaaS framework.
  • Optimization can be made in a way that is transparent to users, and bugs can be fixed without requiring any changes to user code (UDFs).
  • Operating Delta applications is made simple for users as the framework provides resilience and failure tolerance out of the box and collects many granular metrics that can be used for alerts.

Production Usages

Delta has been running in production for over a year and has been playing a crucial role in many Netflix Studio applications. It has helped teams implement use cases such as search indexing, data warehousing, and event driven workflows. Below is a view of the high level architecture of the Delta platform.

Figure 5. High Level Architecture of Delta

Stay Tuned

We will publish follow-up blogs about technical details of the key components such as Delta-Connector and Delta Stream Processing Framework. Please stay tuned. Also feel free to reach out to the authors for any questions you may have.

Credits

We would like to thank the following persons that have been involved in making Delta successful at Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta, Steven Wu, Tharanga Gamaethige, Yun Wang, and Zhenzhong Xu.

References

  1. https://dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. https://dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, and Boerge Svingen. 2019. Online Event Processing. Queue 17, 1, pages 40 (February 2019), 21 pages. DOI: https://doi.org/10.1145/3317287.3321612


Delta: A Data Synchronization and Enrichment Platform was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Building and Scaling Data Lineage at Netflix to Improve Data Infrastructure Reliability, and…

Post Syndicated from Netflix Technology Blog original https://medium.com/netflix-techblog/building-and-scaling-data-lineage-at-netflix-to-improve-data-infrastructure-reliability-and-1a52526a7977?source=rss----2615bd06b42e---4

Building and Scaling Data Lineage at Netflix to Improve Data Infrastructure Reliability, and Efficiency

By: Di Lin, Girish Lingappa, Jitender Aswani

Imagine yourself in the role of a data-inspired decision maker staring at a metric on a dashboard about to make a critical business decision but pausing to ask a question — “Can I run a check myself to understand what data is behind this metric?”

Now, imagine yourself in the role of a software engineer responsible for a micro-service which publishes data consumed by few critical customer facing services (e.g. billing). You are about to make structural changes to the data and want to know who and what downstream to your service will be impacted.

Finally, imagine yourself in the role of a data platform reliability engineer tasked with providing advanced lead time to data pipeline (ETL) owners by proactively identifying issues upstream to their ETL jobs. You are designing a learning system to forecast Service Level Agreement (SLA) violations and would want to factor in all upstream dependencies and corresponding historical states.

At Netflix, user stories centered on understanding data dependencies shared above and countless more in Detection & Data Cleansing, Retention & Data Efficiency, Data Integrity, Cost Attribution, and Platform Reliability subject areas inspired Data Engineering and Infrastructure (DEI) team to envision a comprehensive data lineage system and embark on a development journey a few years ago. We adopted the following mission statement to guide our investments:

“Provide a complete and accurate data lineage system enabling decision-makers to win moments of truth.”

In the rest of this blog, we will a) touch on the complexity of Netflix cloud landscape, b) discuss lineage design goals, ingestion architecture and the corresponding data model, c) share the challenges we faced and the learnings we picked up along the way, and d) close it out with “what’s next” on this journey.

Netflix Data Landscape

Freedom & Responsibility (F&R) is the lynchpin of Netflix’s culture empowering teams to move fast to deliver on innovation and operate with freedom to satisfy their mission. Central engineering teams provide paved paths (secure, vetted and supported options) and guard rails to help reduce variance in choices available for tools and technologies to support the development of scalable technical architectures. Nonetheless, Netflix data landscape (see below) is complex and many teams collaborate effectively for sharing the responsibility of our data system management. Therefore, building a complete and accurate data lineage system to map out all the data-artifacts (including in-motion and at-rest data repositories, Kafka topics, apps, reports and dashboards, interactive and ad-hoc analysis queries, ML and experimentation models) is a monumental task and requires a scalable architecture, robust design, a strong engineering team and above all, amazing cross-functional collaboration.

Data Landscape

Design Goals

At the project inception stage, we defined a set of design goals to help guide the architecture and development work for data lineage to deliver a complete, accurate, reliable and scalable lineage system mapping Netflix’s diverse data landscape. Let’s review a few of these principles:

  • Ensure data integrity — Accurately capture the relationship in data from disparate data sources to establish trust with users because without absolute trust lineage data may do more harm than good.
  • Enable seamless integration — Design the system to integrate with a growing list of data tools and platforms including the ones that do not have the built-in meta-data instrumentation to derive data lineage from.
  • Design a flexible data model— Represent a wide range of data artifacts and relationships among them using a generic data model to enable a wide variety of business use cases.

Ingestion-at-scale

The data movement at Netflix does not necessarily follow a single paved path since engineers have the freedom to choose (and the responsibility to manage) the best available data tools and platforms to achieve their business goals. As a result, a single consolidated and centralized source of truth does not exist that can be leveraged to derive data lineage truth. Therefore, the ingestion approach for data lineage is designed to work with many disparate data sources.

Our data ingestion approach, in a nutshell, is classified broadly into two buckets — push or pull. Today, we are operating using a pull-heavy model. In this model, we scan system logs and metadata generated by various compute engines to collect corresponding lineage data. For example, we leverage inviso to list pig jobs and then lipstick to fetch tables and columns from these pig scripts. For spark compute engine, we leverage spark plan information and for Snowflake, admin tables capture the same information. In addition, we derive lineage information from scheduled ETL jobs by extracting workflow definitions and runtime metadata using Meson scheduler APIs.

In the push model paradigm, various platform tools such as the data transportation layer, reporting tools, and Presto will publish lineage events to a set of lineage related Kafka topics, therefore, making data ingestion relatively easy to scale improving scalability for the data lineage system.

Data Enrichment

The lineage data, when enriched with entity metadata and associated relationships, become more valuable to deliver on a rich set of business cases. We leverage Metacat data, our internal metadata store and service, to enrich lineage data with additional table metadata. We also leverage metadata from another internal tool, Genie, internal job and resource manager, to add job metadata (such as job owner, cluster, scheduler metadata) on lineage data. The ingestions (ETL) pipelines transform enriched datasets to a common data model (design based on a graph structure stored as vertices and edges) to serve lineage use cases. The lineage data along with the enriched information is accessed through many interfaces using SQL against the warehouse and Gremlin and a REST Lineage Service against a graph database populated from the lineage data discussed earlier in this paragraph.

Challenges

We faced a diverse set of challenges spread across many layers in the system. Netflix’s diverse data landscape made it challenging to capture all the right data and conforming it to a common data model. In addition, the ingestion layer designed to address several ingestions patterns added to operational complexity. Spark is the primary big-data compute engine at Netflix and with pretty much every upgrade in Spark, the spark plan changed as well springing continuous and unexpected surprises for us.

We defined a generic data model to store lineage information and now conforming the entity and associated relationships from various data sources to this data model. We are loading the lineage data to a graph database to enable seamless integration with a REST data lineage service to address business use cases. To improve data accuracy, we decided to leverage AWS S3 access logs to identify entity relationships not been captured by our traditional ingestion process.

We are continuing to address the ingestion challenges by adopting a system level instrumentation approach for spark, other compute engines, and data transport tools. We are designing a CRUD layer and exposing it as REST APIs to make it easier for anyone to publish lineage data to our pipelines.

We are taking a mature and comprehensive data lineage system and now extending its coverage far beyond traditional data warehouse realms with a goal to build universal data lineage to represent all the data artifacts and corresponding relationships. We are tackling a bunch of very interesting known unknowns with exciting initiatives in the field of data catalog and asset inventory. Mapping micro-services interactions, entities from real time infrastructure, and ML infrastructure and other non traditional data stores are few such examples.

Lineage Architecture and Data Model

Data Flow

As illustrated in the diagram above, various systems have their own independent data ingestion process in place leading to many different data models that store entities and relationships data at varying granularities. This data needed to be stitched together to accurately and comprehensively describe the Netflix data landscape and required a set of conformance processes before delivering the data for a wider audience.

During the conformance process, the data collected from different sources is transformed to make sure that all entities in our data flow, such as tables, jobs, reports, etc. are described in a consistent format, and stored in a generic data model for further usage.

Based on a standard data model at the entity level, we built a generic relationship model that describes the dependencies between any pair of entities. Using this approach, we are able to build a unified data model and the repository to deliver the right leverage to enable multiple use cases such as data discovery, SLA service and Data Efficiency.

Current Use Cases

Big Data Portal, a visual interface to data management at Netflix, has been the primary consumer of lineage data thus far. Many features benefit from lineage data including ranking of search results, table column usage for downstream jobs, deriving upstream dependencies in workflows, and building visibility of jobs writing to downstream tables.

Our most recent focus has been on powering (a) a data lineage service (REST based) leveraged by SLA service and (b) the data efficiency (to support data lifecycle management) use cases. SLA service relies on the job dependencies defined in ETL workflows to alert on potential SLA misses. This service also proactively alerts on any potential delays in few critical reports due to any job delays or failures anywhere upstream to it.

The data efficiency use cases leverages visibility on entities and their relationships to drive cost and attribution gains, auto cleansing of unused data in the warehouse .

What’s next?

Our journey on extending the value of lineage data to new frontiers has just begun and we have a long way to go in achieving the overarching goal of providing universal data lineage representing all entities and corresponding relationships for all data at Netflix. In the short to medium term, we are planning to onboard more data platforms and leverage graph database and a lineage REST service and GraphQL interface to enable more business use cases including improving developer productivity. We also plan to increase our investment in data detection initiatives by integrating lineage data and detection tools to efficiently scan our data to further improve data hygiene.

Please share your experience by adding your comments below and stay tuned for more on data lineage at Netflix in the follow up blog posts. .

Credits: We want to extend our sincere thanks to many esteemed colleagues from the data platform (part of Data Engineering and Infrastructure org) team at Netflix who pioneered this topic before us and who continue to extend our thinking on this topic with their valuable insights and are building many useful services on lineage data.

We will be at Strata San Francisco on March 27th in room 2001 delivering a tech session on this topic, please join us and share your experiences.

If you would like to be part of our small, impactful, and collaborative team — come join us.


Building and Scaling Data Lineage at Netflix to Improve Data Infrastructure Reliability, and… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Bullet Updates – Windowing, Apache Pulsar PubSub, Configuration-based Data Ingestion, and More

Post Syndicated from rosaliebeevm original https://yahooeng.tumblr.com/post/183315480351

yahoodevelopers:

By Akshay Sarma, Principal Engineer, Verizon Media & Brian Xiao, Software Engineer, Verizon Media

This is the first of an ongoing series of blog posts sharing releases and announcements for Bullet, an open-sourced lightweight, scalable, pluggable, multi-tenant query system.

Bullet allows you to query any data flowing through a streaming system without having to store it first through its UI or API. The queries are injected into the running system and have minimal overhead. Running hundreds of queries generally fit into the overhead of just reading the streaming data. Bullet requires running an instance of its backend on your data. This backend runs on common stream processing frameworks (Storm and Spark Streaming currently supported).

The data on which Bullet sits determines what it is used for. For example, our team runs an instance of Bullet on user engagement data (~1M events/sec) to let developers find their own events to validate their code that produces this data. We also use this instance to interactively explore data, throw up quick dashboards to monitor live releases, count unique users, debug issues, and more.

Since open sourcing Bullet in 2017, we’ve been hard at work adding many new features! We’ll highlight some of these here and continue sharing update posts for future releases.

Windowing

Bullet used to operate in a request-response fashion – you would submit a query and wait for the query to meet its termination conditions (usually duration) before receiving results. For short-lived queries, say, a few seconds, this was fine. But as we started fielding more interactive and iterative queries, waiting even a minute for results became too cumbersome.

Enter windowing! Bullet now supports time and record-based windowing. With time windowing, you can break up your query into chunks of time over its duration and retrieve results for each chunk.  For example, you can calculate the average of a field, and stream back results every second:

In the above example, the aggregation is operating on all the data since the beginning of the query, but you can also do aggregations on just the windows themselves. This is often called a Tumbling window:

image

With record windowing, you can get the intermediate aggregation for each record that matches your query (a Sliding window). Or you can do a Tumbling window on records rather than time. For example, you could get results back every three records:

image

Overlapping windows in other ways (Hopping windows) or windows that reset based on different criteria (Session windows, Cascading windows) are currently being worked on. Stay tuned!

image
image

Apache Pulsar support as a native PubSub

Bullet uses a PubSub (publish-subscribe) message queue to send queries and results between the Web Service and Backend. As with everything else in Bullet, the PubSub is pluggable. You can use your favorite pubsub by implementing a few interfaces if you don’t want to use the ones we provide. Until now, we’ve maintained and supported a REST-based PubSub and an Apache Kafka PubSub. Now we are excited to announce supporting Apache Pulsar as well! Bullet Pulsar will be useful to those users who want to use Pulsar as their underlying messaging service.

If you aren’t familiar with Pulsar, setting up a local standalone is very simple, and by default, any Pulsar topics written to will automatically be created. Setting up an instance of Bullet with Pulsar instead of REST or Kafka is just as easy. You can refer to our documentation for more details.

image

Plug your data into Bullet without code

While Bullet worked on any data source located in any persistence layer, you still had to implement an interface to connect your data source to the Backend and convert it into a record container format that Bullet understands. For instance, your data might be located in Kafka and be in the Avro format. If you were using Bullet on Storm, you would perhaps write a Storm Spout to read from Kafka, deserialize, and convert the Avro data into the Bullet record format. This was the only interface in Bullet that required our customers to write their own code. Not anymore! Bullet DSL is a text/configuration-based format for users to plug in their data to the Bullet Backend without having to write a single line of code.

Bullet DSL abstracts away the two major components for plugging data into the Bullet Backend. A Connector piece to read from arbitrary data-sources and a Converter piece to convert that read data into the Bullet record container. We currently support and maintain a few of these – Kafka and Pulsar for Connectors and Avro, Maps and arbitrary Java POJOs for Converters. The Converters understand typed data and can even do a bit of minor ETL (Extract, Transform and Load) if you need to change your data around before feeding it into Bullet. As always, the DSL components are pluggable and you can write your own (and contribute it back!) if you need one that we don’t support.

We appreciate your feedback and contributions! Explore Bullet on GitHub, use and help contribute to the project, and chat with us on Google Groups. To get started, try our Quickstarts on Spark or Storm to set up an instance of Bullet on some fake data and play around with it.

How we simplified our Data Ingestion & Transformation Process

Post Syndicated from Grab Tech original https://engineering.grab.com/data-ingestion-transformation-product-insights

Introduction

As Grab grew from a small startup to an organisation serving millions of customers and driver partners, making day-to-day data-driven decisions became paramount. We needed a system to efficiently ingest data from mobile apps and backend systems and then make it available for analytics and engineering teams.

Thanks to modern data processing frameworks, ingesting data isn’t a big issue. However, at Grab scale it is a non-trivial task. We had to prepare for two key scenarios:

  • Business growth, including organic growth over time and expected seasonality effects.
  • Any unexpected peaks due to unforeseen circumstances. Our systems have to be horizontally scalable.

We could ingest data in batches, in real time, or a combination of the two. When you ingest data in batches, you can import it at regularly scheduled intervals or when it reaches a certain size. This is very useful when processes run on a schedule, such as reports that run daily at a specific time. Typically, batched data is useful for offline analytics and data science.

On the other hand, real-time ingestion has significant business value, such as with reactive systems. For example, when a customer provides feedback for a Grab superapp widget, we re-rank widgets based on that customer’s likes or dislikes. Note when information is very time-sensitive, you must continuously monitor its data.

This blog post describes how Grab built a scalable data ingestion system and how we went from prototyping with Spark Streaming to running a production-grade data processing cluster written in Golang.

Building the system without reinventing the wheel

The data ingestion system:

  1. Collects raw data as app events.
  2. Transforms the data into a structured format.
  3. Stores the data for analysis and monitoring.

In a previous blog post, we discussed dealing with batched data ETL with Spark. This post focuses on real-time ingestion.

We separated the data ingestion system into 3 layers: collection, transformation, and storage. This table and diagram highlights the tools used in each layer in our system’s first design.

Layer Tools
Collection Gateway, Kafka
Transformation Go processing service, Spark Streaming
Storage TalariaDB

Our first design might seem complex, but we used battle-tested and common tools such as Apache Kafka and Spark Streaming. This let us get an end-to-end solution up and running quickly.

Collection layer

Our collection layer had two sub-layers:

  1. Our custom built API Gateway received HTTP requests from the mobile app. It simply decoded and authenticated HTTP requests, streaming the data to the Kafka queue.
  2. The Kafka queue decoupled the transformation layer (shown in the above figure as the processing service and Spark streaming) from the collection layer (shown above as the Gateway service). We needed to retain raw data in the Kafka queue for fault tolerance of the entire system. Imagine an error where a data pipeline pollutes the data with flawed transformation code or just simply crashes. The Kafka queue saves us from data loss by data backfilling.

Since it’s robust and battle-tested, we chose Kafka as our queueing solution. It perfectly met our requirements, such as high throughput and low latency. Although Kafka takes some operational effort such as self-hosting and monitoring, Grab has a proficient and dedicated team managing our Kafka cluster.

Transformation layer

There are many options for real-time data processing, including Spark Streaming, Flink, and Storm. Since we use Spark for all our batch processing, we decided to use Spark Streaming.

We deployed a Golang processing service between Kafka and Spark Streaming. This service converts the data from Protobuf to Avro. Instead of pointing Spark Streaming directly to Kafka, we used this processing service as an intermediary. This was because our Spark Streaming job was written in Python and Spark doesn’t natively support protobuf decoding.  We used Avro format, since Grab historically used it for archiving streaming data. Each raw event was enriched and batched together with other events. Batches were then uploaded to S3.

Storage layer

TalariaDB is a Grab-built time-series database. It ingests events as columnar ORC files, indexing them by event name and time. We use the same ORC format files for batch processing. TalariaDB also implements the Presto Thrift connector interface, so our users could query certain event types by time range. They did this by connecting a Presto to a TalariaDB hosting distributed cluster.

Problems

Building and deploying our data pipeline’s MVP provided great value to our data analysts, engineers, and QA team. For example, our mobile app team could monitor any abnormal change in the real-time metrics, such as the screen load time for the latest released app version. The QA team could perform app side actions (book a ride, make payment, etc.) and check which events were triggered and received by the backend. The latency between the ingestion and the serving layer was only 4 minutes instead of the batch processing system’s 60 minutes. The streaming processing’s data showed good business value.

This prompted us to develop more features on top of our platform-collected real-time data. Very soon our QA engineers and the product analytics team used more and more of the real-time data processing system. They started instrumenting various mobile applications so more data started flowing in. However, as our ingested data increased, so did our problems. These were mostly related to operational complexity and the increased latency.

Operational complexity

Only a few team members could operate Spark Streaming and EMR. With more data and variable rates, our streaming jobs had scaling issues and failed occasionally. This was due to checkpoint issues when the cluster was under heavy load. Increasing the cluster size helped, but adding more nodes also increased the likelihood of losing more cluster nodes. When we lost nodes,our latency went up and added more work for our already busy on-call engineers.

Supporting native Protobuf

To simplify the architecture, we initially planned to bypass our Golang-written processing service for the real-time data pipeline. Our plan was to let Spark directly talk to the Kafka queue and send the output to S3. This required packaging the decoders for our protobuf messages for Python Spark jobs, which was cumbersome. We thought about rewriting our job in Scala, but we didn’t have enough experience with it.

Also, we’d soon hit some streaming limits from S3. Our Spark streaming job was consuming objects from S3, but the process was not continuous due to S3’s  eventual consistency. To avoid long pagination queries in the S3 API, we had to prefix the data with the hour in which it was ingested. This resulted in some data loss after processing by the Spark streaming. The loss happened because the new data would appear in S3 while Spark Streaming had already moved on to the next hour. We tried various tweaks, but it was just a bad design. As our data grew to over one terabyte per hour, our data loss grew with it.

Processing lag

On average, the time from our system ingesting an event to when it was available on the Presto was 4 to 6 minutes. We call that processing lag, as it happened due to our data processing. It was substantially worse under heavy loads, increasing to 8 to 13 minutes. While that wasn’t bad at this scale (a few TBs of data), it made some use cases impossible, such as monitoring. We needed to do better.

Simplifying the architecture and rewriting in Golang

After completing the MVP phase development, we noticed the Spark Streaming functionality we actually used was relatively trivial. In the Spark Streaming job, we only:

  • Partitioned the batch of events by event name.
  • Encoded the data in ORC format.
  • And uploaded to an S3 bucket.

To mitigate the problems mentioned above, we tried re-implementing the features in our existing Golang processing service. Besides consuming the data and publishing to an S3 bucket, the transformation service also needed to deal with event partitioning and ORC encoding.

One key problem we addressed was implementing a robust event partitioner with a large write throughput and low read latency. Fortunately, Golang has a nice concurrent map package. To further reduce the lock contention, we added sharding.

We made the changes, deployed the service to production,and discovered our service was now memory-bound as we buffered data for 1 minute. We did thorough benchmarking and profiling on heap allocation to improve memory utilization. By iteratively reducing inefficiencies and contributing to a lower CPU consumption, we made our data transformation more efficient.

Performance

After revamping the system, the elapsed time for a single event to travel from the gateway to our dashboard is about 1 minute. We also fixed the data loss issue. Finally, we significantly reduced our on-call workload by removing Spark Streaming.

Validation

At this point, we had both our old and new pipelines running in parallel. After drastically improving our performance, we needed to confirm we still got the same end results. This was done by running a query against each of the pipelines and comparing the results. Both systems were registered to the same Presto cluster.

We ran two SQL “excerpts” between the two pipelines in different order. Both queries returned the same events, validating our new pipeline’s correctness.

select count(1) from ((
 select uuid, time from grab_x.realtime_new
 where event = 'app.metric1' and time between 1541734140 and 1541734200
) except (
 select uuid, time from grab_x.realtime_old
 where event = 'app.metric1' and time between 1541734140 and 1541734200
))

/* output: 0 */

Conclusions

Scaling a data ingestion system to handle hundreds of thousands of events per second was a non-trivial task. However, by iterating and constantly simplifying our overall architecture, we were able to efficiently ingest the data and drive down its lag to around one minute.

Spark Streaming was a great tool and gave us time to understand the problem. But, understanding what we actually needed to build and iteratively optimise the entire data pipeline led us to:

  • Replacing Spark Streaming with our new Golang-implemented pipeline.
  • Removing Avro encoding.
  • Removing an intermediary S3 step.

Differences between the old and new pipelines are:

Old Pipeline New Pipeline
Languages Python, Go Go
Stages 4 services 3 services
Conversions Protobuf → Avro → ORC Protobuf → ORC
Lag 4-13 min 1 min

Systems usually become more and more complex over time, leading to tech debt and decreased performance. In our case, starting with more steps in the data pipeline was actually the simple solution, since we could re-use existing tools. But as we reduced processing stages, we’ve also seen fewer failures. By simplifying the problem, we improved performance and decreased operational complexity. At the end of the day, our data pipeline solves exactly our problem and does nothing else, keeping things fast.