Tag Archives: Data Analytics

Enabling near real-time data analytics on the data lake

Post Syndicated from Grab Tech original https://engineering.grab.com/enabling-near-realtime-data-analytics

Introduction

In the domain of data processing, data analysts run their ad hoc queries on the data lake. The lake serves as an interface between our analytics and production environment, preventing downstream queries from impacting upstream data ingestion pipelines. To ensure efficient data processing in the data lake, choosing appropriate storage formats is crucial.

The vanilla data lake solution is built on top of cloud object storage with Hive metastore, where data files are written in Parquet format. Although this setup is optimised for scalable analytics query patterns, it struggles to handle frequent updates to the data due to two reasons:

  1. The Hive table format requires us to rewrite the Parquet files with the latest data. For instance, to update one record in a Hive unpartitioned table, we would need to read all the data, update the record, and write back the entire data set.
  2. Writing Parquet files is expensive due to the overhead of organising the data to a compressed columnar format, which is more complex than a row format.

The issue is further exacerbated by the scheduled downstream transformations. These necessary steps, which clean and process the data for use, increase the latency because the total delay now includes the combined scheduled intervals of these processing jobs.

Fortunately, the introduction of the Hudi format, which supports fast writes by allowing Avro and Parquet files to co-exist on a Merge On Read (MOR) table, opens up the possibility of having a data lake with minimal data latency. The concept of a commit timeline further allows data to be served with Atomicity, Consistency, Isolation, and Durability (ACID) guarantees.

We employ different sets of configurations for the different characteristics of our input sources:

  1. High or low throughput. A high-throughput source refers to one that has a high level of activity. One example of this can be our stream of booking events generated from each customer transaction. On the other hand, a low-throughput source would be one that has a relative low level of activity. An example of this can be transaction events generated from reconciliation happening on a nightly basis.
  2. Kafka (unbounded) or Relational Database Sources (bounded). Our sinks have sources that can be broadly categorised into unbounded and bounded sources. Unbounded sources are usually related to transaction events materialised as Kafka topics, representing user-generated events as they interact with the Grab superapp. Bounded sources usually refer to Relational Database (RDS) sources, whose size is bound to storage provisioned.

The following sections will delve into the differences between each source and our corresponding configurations optimised for them.

High throughput source

For our data sources with high throughput, we have chosen to write the files in MOR format since the writing of files in Avro format allows for fast writes to meet our latency requirements.

Figure 1 Architecture for MOR tables

As seen in Figure 1, we use Flink to perform the stream processing and write out log files in Avro format in our setup. We then set up a separate Spark writer which periodically converts the Avro files into Parquet format in the Hudi compaction process.

We have further simplified the coordination between the Flink and Spark writers by enabling asynchronous services on the Flink writer so it can generate the compaction plans for Spark writers to act on. During the Spark job runs, it checks for available compaction plans and acts on them, placing the burden of orchestrating the writes solely on the Flink writer. This approach could help minimise potential concurrency problems that might otherwise arise, as there would be a single actor
orchestrating the associated Hudi table services.

Low throughput source

Figure 2 Architecture for COW tables

For low throughput sources, we gravitate towards the choice of Copy On Write (COW) tables given the simplicity of its design, since it only involves one component, which is the Flink writer. The downside is that it has higher data latency because this setup only generates Parquet format data snapshots at each checkpoint interval, which is typically about 10-15 minutes.

Connecting to our Kafka (unbounded) data source

Grab uses Protobuf as our central data format in Kafka, ensuring schema evolution compatibility. However, the derivation of the schema of these topics still requires some transformation to make it compatible with Hudi’s accepted schema. Some of these transformations include ensuring that Avro record fields do not contain just a single array field, and handling logical decimal schemas to transform them to fixed byte schema for Spark compatibility.

Given the unbounded nature of the source, we decided to partition it by Kafka event time up to the hour level. This ensured that our Hudi operations would be faster. Parquet file writes would be faster since they would only affect files within the same partition, and each Parquet file within the same event time partition would have a bounded size given the monotonically increasing nature of Kafka event time.

By partitioning tables by Kafka event time, we can further optimise compaction planning operations, since the amount of file lookups required is now reduced with the use of BoundedPartitionAwareCompactionStrategy. Only log files in recent partitions would be selected for compaction and the job manager need not list every partition to figure out which log files to select for compaction during the planning phase anymore.

Connecting to our RDS (bounded) data source

For our RDS, we decided to use the Flink Change Data Capture (CDC) connectors by Veverica to obtain the binlog streams. The RDS would then treat the Flink writer as a replication server and start streaming its binlog data to it for each MySQL change. The Flink CDC connector presents the data as a Kafka Connect (KC) Source record, since it uses the Debezium connector under the hood. It is then a straightforward task to deserialise these records and transform them into Hudi records, since
the Avro schema and associated data changes are already captured within the KC source record.

The obtained binlog timestamp is also emitted as a metric during consumption for us to monitor the observed data latency at the point of ingestion.

Optimising for these sources involves two phases:

  1. First, assigning more resources for the cold start incremental snapshot process where Flink takes a snapshot of the current data state in the RDS and loads the Hudi table with that snapshot. This phase is usually resource-heavy as there are a lot of file writes and data ingested during this process.
  2. Once the snapshotting is completed, Flink would then start to process the binlog stream and the observed throughput would drop to a level similar to the DB write throughput. The resources required by the Flink writer at this stage would be much lower than in the snapshot phase.

Indexing for Hudi tables

Indexing is important for upserting Hudi tables when the writing engine performs updates, allowing it to efficiently locate the file groups of the data to be updated.

As of version 0.14, the Flink engine only supports Bucket Index or Flink State Index. Bucket Index performs indexing of the file record by hashing the record key and matching it to a specific bucket of files indicated by the naming convention of the written data files. Flink State Index on the other hand stores the index map of record keys to files in memory.

Given that our tables include unbounded Kafka sources, there is a possibility for our state indexes to grow indefinitely. Furthermore, the requirement of state preservation for Flink State Index across version deployments and configuration updates adds complexity to the overall solution.

Thus, we opted for the simple Bucket Index for its simplicity and the fact that our Hudi table size per partition does not change drastically across the week. However, this comes with a limitation whereby the number of buckets cannot be updated easily and imposes a parallelism limit at which our Flink pipelines can scale. Thus, as traffic grows organically, we would find ourselves in a situation whereby our configuration grows obsolete and cannot handle the increased load.

To resolve this going forward, using consistent hashing for the Bucket Index would be something to explore to optimise our Parquet file sizes and allow the number of buckets to grow seamlessly as traffic grows.

Impact

Fresh business metrics

Post creation of our Hudi Data Ingestion solution, we have enabled various users such as our data analysts to perform ad hoc queries much more easily on data that has lower latency. Furthermore, Hudi tables can be seamlessly joined with Hive tables in Trino for additional context. This enabled the construction of operational dashboards reflecting fresh business metrics to our various operators, empowering them with the necessary information to quickly respond to any abnormalities (such as high-demand events like F1 or seasonal holidays).

Quicker fraud detection

Another significant user of our solution is our fraud detection analysts. This enabled them to rapidly access fresh transaction events and analyse them for fraudulent patterns, particularly during the emergence of a new attack pattern that hadn’t been detected by their rules engine. Our solution also allowed them to perform multiple ad hoc queries that involve lookbacks of various days’ worth of data without impacting our production RDS and Kafka clusters by using the data lake as the data interface, reducing the data latency to the minute level and, in turn, empowering them to respond more quickly to attacks.

What’s next?

As the landscape of data storage solutions evolves rapidly, we are eager to test and integrate new features like Record Level Indexing and the creation of Pre Join tables. This evolution extends beyond the Hudi community to other table formats such as IceBerg and DeltaLake. We remain ready to adapt ourselves to these changes and incorporate the advantages of each format into our data lake within Grab.

References

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Real-Time In-Stream Inference with AWS Kinesis, SageMaker & Apache Flink

Post Syndicated from Shawn Sachdev original https://aws.amazon.com/blogs/architecture/realtime-in-stream-inference-kinesis-sagemaker-flink/

As businesses race to digitally transform, the challenge is to cope with the amount of data, and the value of that data diminishes over time. The challenge is to analyze, learn, and infer from real-time data to predict future states, as well as to detect anomalies and get accurate results. In this blog post, we’ll explain the architecture for a solution that can achieve real-time inference on streaming data. We’ll also cover the integration of Amazon Kinesis Data Analytics (KDA) with Apache Flink to asynchronously invoke any underlying services (or databases).

Managed real-time in-stream data inference is quite a mouthful; let’s break it up:

  • In-stream data refers to the capability of processing a data stream that collects, processes, and analyzes data.
  • Real-time inference refers to the ability to use data from the feed to project future state for the underlying data.

Consider a streaming application that captures credit card transactions along with the other parameters (such as source IP to capture the geographic details of the transaction as well as the  amount). This data can then be used to be used to infer fraudulent transactions instantaneously. Compare that to a traditional batch-oriented approach that identifies fraudulent transactions at the end of every business day and generates a report when it’s too late, after bad actors have already committed fraud.

Architecture overview

In this post, we discuss how you can use Amazon Kinesis Data Analytics for Apache Flink (KDA), Amazon SageMaker, Apache Flink, and Amazon API Gateway to address the challenges such as real-time fraud detection on a stream of credit card transaction data. We explore how to build a managed, reliable, scalable, and highly available streaming architecture based on managed services that substantially reduce the operational overhead compared to a self-managed environment. Our particular focus is on how to prepare and run Flink applications with KDA for Apache Flink applications.

The following diagram illustrates this architecture:

Run Apache Flink applications with KDA for Apache Flink applications

In above architecture, data is ingested in AWS Kinesis Data Streams (KDS) using Amazon Kinesis Producer Library (KPL), and you can use any ingestion patterns supported by KDS. KDS then streams the data to an Apache Flink-based KDA application. KDA manages the required infrastructure for Flink, scales the application in response to changing traffic patterns, and automatically recovers from underlying failures. The Flink application is configured to call an API Gateway endpoint using Asynchronous I/O. Residing behind the API Gateway is an AWS SageMaker endpoint, but any endpoints can be used based on your data enrichment needs. Flink distributes the data across one or more stream partitions, and user-defined operators can transform the data stream.

Let’s talk about some of the key pieces of this architecture.

What is Apache Flink?

Apache Flink is an open source distributed processing framework that is tailored to stateful computations over unbounded and bounded datasets. The architecture uses KDA with Apache Flink to run in-stream analytics and uses Asynchronous I/O operator to interact with external systems.

KDA and Apache Flink

KDA for Apache Flink is a fully managed AWS service that enables you to use an Apache Flink application to process streaming data. With KDA for Apache Flink, you can use Java or Scala to process and analyze streaming data. The service enables you to author and run code against streaming sources. KDA provides the underlying infrastructure for your Flink applications. It handles core capabilities like provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots).

Flink Asynchronous I/O Operator

Flink Asynchronous I/O Operator

Flink’s Asynchronous I/O operator allows you to use asynchronous request clients for external systems to enrich stream events or perform computation. Asynchronous interaction with the external system means that a single parallel function instance can handle multiple requests and receive the responses concurrently. In most cases this leads to higher streaming throughput. Asynchronous I/O API integrates well with data streams, and handles order, event time, fault tolerance, etc. You can configure this operator to call external sources like databases and APIs. The architecture pattern explained in this post is configured to call API Gateway integrated with SageMaker endpoints.

Please refer code at kda-flink-ml, a sample Flink application with implementation of Asynchronous I/O operator to call an external Sagemaker endpoint via API Gateway. Below is the snippet of code of StreamingJob.java from sample Flink application.

DataStream<HttpResponse<RideRequest>> predictFareResponse =
            // Asynchronously call predictFare Endpoint
            AsyncDataStream.unorderedWait(
                predictFareRequests,
                new Sig4SignedHttpRequestAsyncFunction<>(predictFareEndpoint, apiKeyHeader),
                30, TimeUnit.SECONDS, 20
            )
            .returns(newTypeHint<HttpResponse<RideRequest>() {});

The operator code above requires following inputs:

  1. An input data stream
  2. An implementation of AsyncFunction that dispatches the requests to the external system
  3. Timeout, which defines how long an asynchronous request may take before it considered failed
  4. Capacity, which defines how many asynchronous requests may be in progress at the same time

How Amazon SageMaker fits into this puzzle

In our architecture we are proposing a SageMaker endpoint for inferencing that is invoked via API Gateway, which can detect fraudulent transactions.

Amazon SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy machine learning (ML) models quickly. SageMaker removes the heavy lifting from each step of the machine learning process to make it easier to build and develop high quality models. You can use these trained models in an ingestion pipeline to make real-time inferences.

You can set up persistent endpoints to get predictions from your models that are deployed on SageMaker hosting services. For an overview on deploying a single model or multiple models with SageMaker hosting services, see Deploy a Model on SageMaker Hosting Services.

Ready for a test drive

To help you get started, we would like to introduce an AWS Solution: AWS Streaming Data Solution for Amazon Kinesis (Option 4) that is available as a single-click cloud formation template to assist you in quickly provisioning resources to get your real-time in-stream inference pipeline up and running in a few minutes. In this solution we leverage AWS Lambda, but that can be switched with a SageMaker endpoint to achieve the architecture discussed earlier in this post. You can also leverage the pre-built AWS Solutions Construct, which implements an Amazon API Gateway connected to an Amazon SageMaker endpoint pattern that can replace AWS Lambda in the below solution. See the implementation guide for this solution.

The following diagram illustrates the architecture for the solution:

architecture for the solution

Conclusion

In this post we explained the architecture to build a managed, reliable, scalable, and highly available application that is capable of real-time inferencing on a data stream. The architecture was built using KDS, KDA for Apache Flink, Apache Flink, and Amazon SageMaker. The architecture also illustrates how you can use managed services so that you don’t need to spend time provisioning, configuring, and managing the underlying infrastructure. Instead, you can spend your time creating insights and inference from your data.

We also talked about the AWS Streaming Data Solution for Amazon Kinesis, which is an AWS vetted solution that provides implementations for applications you can automatically deploy directly into your AWS account. The solution automatically configures the AWS services necessary to easily capture, store, process, and infer from streaming data.

Architecting a Data Lake for Higher Education Student Analytics

Post Syndicated from Craig Jordan original https://aws.amazon.com/blogs/architecture/architecting-data-lake-for-higher-education-student-analytics/

One of the keys to identifying timely and impactful actions is having enough raw material to work with. However, this up-to-date information typically lives in the databases that sit behind several different applications. One of the first steps to finding data-driven insights is gathering that information into a single store that an analyst can use without interfering with those applications.

For years, reporting environments have relied on a data warehouse stored in a single, separate relational database management system (RDBMS). But now, due to the growing use of Software as a service (SaaS) applications and NoSQL database options, data may be stored outside the data center and in formats other than tables of rows and columns. It’s increasingly difficult to access the data these applications maintain, and a data warehouse may not be flexible enough to house the gathered information.

For these reasons, reporting teams are building data lakes, and those responsible for using data analytics at universities and colleges are no different. However, it can be challenging to know exactly how to start building this expanded data repository so it can be ready to use quickly and still expandable as future requirements are uncovered. Helping higher education institutions address these challenges is the topic of this post.

About Maryville University

Maryville University is a nationally recognized private institution located in St. Louis, Missouri, and was recently named the second fastest growing private university by The Chronicle of Higher Education. Even with its enrollment growth, the university is committed to a highly personalized education for each student, which requires reliable data that is readily available to multiple departments. University leaders want to offer the right help at the right time to students who may be having difficulty completing the first semester of their course of study. To get started, the data experts in the Office of Strategic Information and members of the IT Department needed to create a data environment to identify students needing assistance.

Critical data sources

Like most universities, Maryville’s student-related data centers around two significant sources: the student information system (SIS), which houses student profiles, course completion, and financial aid information; and the learning management system (LMS) in which students review course materials, complete assignments, and engage in online discussions with faculty and fellow students.

The first of these, the SIS, stores its data in an on-premises relational database, and for several years, a significant subset of its contents had been incorporated into the university’s data warehouse. The LMS, however, contains data that the team had not tried to bring into their data warehouse. Moreover, that data is managed by a SaaS application from Instructure, called “Canvas,” and is not directly accessible for traditional extract, transform, and load (ETL) processing. The team recognized they needed a new approach and began down the path of creating a data lake in AWS to support their analysis goals.

Getting started on the data lake

The first step the team took in building their data lake made use of an open source solution that Harvard’s IT department developed. The solution, comprised of AWS Lambda functions and Amazon Simple Storage Service (S3) buckets, is deployed using AWS CloudFormation. It enables any university that uses Canvas for their LMS to implement a solution that moves LMS data into an S3 data lake on a daily basis. The following diagram illustrates this portion of Maryville’s data lake architecture:

The data lake for the Learning Management System data

Diagram 1: The data lake for the Learning Management System data

The AWS Lambda functions invoke the LMS REST API on a daily schedule resulting in Maryville’s data, which has been previously unloaded and compressed by Canvas, to be securely stored into S3 objects. AWS Glue tables are defined to provide access to these S3 objects. Amazon Simple Notification Service (SNS) informs stakeholders the status of the data loads.

Expanding the data lake

The next step was deciding how to copy the SIS data into S3. The team decided to use the AWS Database Migration Service (DMS) to create daily snapshots of more than 2,500 tables from this database. DMS uses a source endpoint for secure access to the on-premises database instance over VPN. A target endpoint determines the specific S3 bucket into which the data should be written. A migration task defines which tables to copy from the source database along with other migration options. Finally, a replication instance, a fully managed virtual machine, runs the migration task to copy the data. With this configuration in place, the data lake architecture for SIS data looks like this:

Diagram 2: Migrating data from the Student Information System

Diagram 2: Migrating data from the Student Information System

Handling sensitive data

In building a data lake you have several options for handling sensitive data including:

  • Leaving it behind in the source system and avoid copying it through the data replication process
  • Copying it into the data lake, but taking precautions to ensure that access to it is limited to authorized staff
  • Copying it into the data lake, but applying processes to eliminate, mask, or otherwise obfuscate the data before it is made accessible to analysts and data scientists

The Maryville team decided to take the first of these approaches. Building the data lake gave them a natural opportunity to assess where this data was stored in the source system and then make changes to the source database itself to limit the number of highly sensitive data fields.

Validating the data lake

With these steps completed, the team turned to the final task, which was to validate the data lake. For this process they chose to make use of Amazon Athena, AWS Glue, and Amazon Redshift. AWS Glue provided multiple capabilities including metadata extraction, ETL, and data orchestration. Metadata extraction, completed by Glue crawlers, quickly converted the information that DMS wrote to S3 into metadata defined in the Glue data catalog. This enabled the data in S3 to be accessed using standard SQL statements interactively in Athena. Without the added cost and complexity of a database, Maryville’s data analyst was able to confirm that the data loads were completing successfully. He was also able to resolve specific issues encountered on particular tables. The SQL queries, written in Athena, could later be converted to ETL jobs in AWS Glue, where they could be triggered on a schedule to create additional data in S3. Athena and Glue enabled the ETL that was needed to transform the raw data delivered to S3 into prepared datasets necessary for existing dashboards.

Once curated datasets were created and stored in S3, the data was loaded into an AWS Redshift data warehouse, which supported direct access by tools outside of AWS using ODBC/JDBC drivers. This capability enabled Maryville’s team to further validate the data by attaching the data in Redshift to existing dashboards that were running in Maryville’s own data center. Redshift’s stored procedure language allowed the team to port some key ETL logic so that the engineering of these datasets could follow a process similar to approaches used in Maryville’s on-premises data warehouse environment.

Conclusion

The overall data lake/data warehouse architecture that the Maryville team constructed currently looks like this:

The complete architecture

Diagram 3: The complete architecture

Through this approach, Maryville’s two-person team has moved key data into position for use in a variety of workloads. The data in S3 is now readily accessible for ad hoc interactive SQL workloads in Athena, ETL jobs in Glue, and ultimately for machine learning workloads running in EC2, Lambda or Amazon Sagemaker. In addition, the S3 storage layer is easy to expand without interrupting prior workloads. At the time of this writing, the Maryville team is both beginning to use this environment for machine learning models described earlier as well as adding other data sources into the S3 layer.

Acknowledgements

The solution described in this post resulted from the collaborative effort of Christine McQuie, Data Engineer, and Josh Tepen, Cloud Engineer, at Maryville University, with guidance from Travis Berkley and Craig Jordan, AWS Solutions Architects.