Tag Archives: Real-time

Enabling near real-time data analytics on the data lake

Post Syndicated from Grab Tech original https://engineering.grab.com/enabling-near-realtime-data-analytics

Introduction

In the domain of data processing, data analysts run their ad hoc queries on the data lake. The lake serves as an interface between our analytics and production environment, preventing downstream queries from impacting upstream data ingestion pipelines. To ensure efficient data processing in the data lake, choosing appropriate storage formats is crucial.

The vanilla data lake solution is built on top of cloud object storage with Hive metastore, where data files are written in Parquet format. Although this setup is optimised for scalable analytics query patterns, it struggles to handle frequent updates to the data due to two reasons:

  1. The Hive table format requires us to rewrite the Parquet files with the latest data. For instance, to update one record in a Hive unpartitioned table, we would need to read all the data, update the record, and write back the entire data set.
  2. Writing Parquet files is expensive due to the overhead of organising the data to a compressed columnar format, which is more complex than a row format.

The issue is further exacerbated by the scheduled downstream transformations. These necessary steps, which clean and process the data for use, increase the latency because the total delay now includes the combined scheduled intervals of these processing jobs.

Fortunately, the introduction of the Hudi format, which supports fast writes by allowing Avro and Parquet files to co-exist on a Merge On Read (MOR) table, opens up the possibility of having a data lake with minimal data latency. The concept of a commit timeline further allows data to be served with Atomicity, Consistency, Isolation, and Durability (ACID) guarantees.

We employ different sets of configurations for the different characteristics of our input sources:

  1. High or low throughput. A high-throughput source refers to one that has a high level of activity. One example of this can be our stream of booking events generated from each customer transaction. On the other hand, a low-throughput source would be one that has a relative low level of activity. An example of this can be transaction events generated from reconciliation happening on a nightly basis.
  2. Kafka (unbounded) or Relational Database Sources (bounded). Our sinks have sources that can be broadly categorised into unbounded and bounded sources. Unbounded sources are usually related to transaction events materialised as Kafka topics, representing user-generated events as they interact with the Grab superapp. Bounded sources usually refer to Relational Database (RDS) sources, whose size is bound to storage provisioned.

The following sections will delve into the differences between each source and our corresponding configurations optimised for them.

High throughput source

For our data sources with high throughput, we have chosen to write the files in MOR format since the writing of files in Avro format allows for fast writes to meet our latency requirements.

Figure 1 Architecture for MOR tables

As seen in Figure 1, we use Flink to perform the stream processing and write out log files in Avro format in our setup. We then set up a separate Spark writer which periodically converts the Avro files into Parquet format in the Hudi compaction process.

We have further simplified the coordination between the Flink and Spark writers by enabling asynchronous services on the Flink writer so it can generate the compaction plans for Spark writers to act on. During the Spark job runs, it checks for available compaction plans and acts on them, placing the burden of orchestrating the writes solely on the Flink writer. This approach could help minimise potential concurrency problems that might otherwise arise, as there would be a single actor
orchestrating the associated Hudi table services.

Low throughput source

Figure 2 Architecture for COW tables

For low throughput sources, we gravitate towards the choice of Copy On Write (COW) tables given the simplicity of its design, since it only involves one component, which is the Flink writer. The downside is that it has higher data latency because this setup only generates Parquet format data snapshots at each checkpoint interval, which is typically about 10-15 minutes.

Connecting to our Kafka (unbounded) data source

Grab uses Protobuf as our central data format in Kafka, ensuring schema evolution compatibility. However, the derivation of the schema of these topics still requires some transformation to make it compatible with Hudi’s accepted schema. Some of these transformations include ensuring that Avro record fields do not contain just a single array field, and handling logical decimal schemas to transform them to fixed byte schema for Spark compatibility.

Given the unbounded nature of the source, we decided to partition it by Kafka event time up to the hour level. This ensured that our Hudi operations would be faster. Parquet file writes would be faster since they would only affect files within the same partition, and each Parquet file within the same event time partition would have a bounded size given the monotonically increasing nature of Kafka event time.

By partitioning tables by Kafka event time, we can further optimise compaction planning operations, since the amount of file lookups required is now reduced with the use of BoundedPartitionAwareCompactionStrategy. Only log files in recent partitions would be selected for compaction and the job manager need not list every partition to figure out which log files to select for compaction during the planning phase anymore.

Connecting to our RDS (bounded) data source

For our RDS, we decided to use the Flink Change Data Capture (CDC) connectors by Veverica to obtain the binlog streams. The RDS would then treat the Flink writer as a replication server and start streaming its binlog data to it for each MySQL change. The Flink CDC connector presents the data as a Kafka Connect (KC) Source record, since it uses the Debezium connector under the hood. It is then a straightforward task to deserialise these records and transform them into Hudi records, since
the Avro schema and associated data changes are already captured within the KC source record.

The obtained binlog timestamp is also emitted as a metric during consumption for us to monitor the observed data latency at the point of ingestion.

Optimising for these sources involves two phases:

  1. First, assigning more resources for the cold start incremental snapshot process where Flink takes a snapshot of the current data state in the RDS and loads the Hudi table with that snapshot. This phase is usually resource-heavy as there are a lot of file writes and data ingested during this process.
  2. Once the snapshotting is completed, Flink would then start to process the binlog stream and the observed throughput would drop to a level similar to the DB write throughput. The resources required by the Flink writer at this stage would be much lower than in the snapshot phase.

Indexing for Hudi tables

Indexing is important for upserting Hudi tables when the writing engine performs updates, allowing it to efficiently locate the file groups of the data to be updated.

As of version 0.14, the Flink engine only supports Bucket Index or Flink State Index. Bucket Index performs indexing of the file record by hashing the record key and matching it to a specific bucket of files indicated by the naming convention of the written data files. Flink State Index on the other hand stores the index map of record keys to files in memory.

Given that our tables include unbounded Kafka sources, there is a possibility for our state indexes to grow indefinitely. Furthermore, the requirement of state preservation for Flink State Index across version deployments and configuration updates adds complexity to the overall solution.

Thus, we opted for the simple Bucket Index for its simplicity and the fact that our Hudi table size per partition does not change drastically across the week. However, this comes with a limitation whereby the number of buckets cannot be updated easily and imposes a parallelism limit at which our Flink pipelines can scale. Thus, as traffic grows organically, we would find ourselves in a situation whereby our configuration grows obsolete and cannot handle the increased load.

To resolve this going forward, using consistent hashing for the Bucket Index would be something to explore to optimise our Parquet file sizes and allow the number of buckets to grow seamlessly as traffic grows.

Impact

Fresh business metrics

Post creation of our Hudi Data Ingestion solution, we have enabled various users such as our data analysts to perform ad hoc queries much more easily on data that has lower latency. Furthermore, Hudi tables can be seamlessly joined with Hive tables in Trino for additional context. This enabled the construction of operational dashboards reflecting fresh business metrics to our various operators, empowering them with the necessary information to quickly respond to any abnormalities (such as high-demand events like F1 or seasonal holidays).

Quicker fraud detection

Another significant user of our solution is our fraud detection analysts. This enabled them to rapidly access fresh transaction events and analyse them for fraudulent patterns, particularly during the emergence of a new attack pattern that hadn’t been detected by their rules engine. Our solution also allowed them to perform multiple ad hoc queries that involve lookbacks of various days’ worth of data without impacting our production RDS and Kafka clusters by using the data lake as the data interface, reducing the data latency to the minute level and, in turn, empowering them to respond more quickly to attacks.

What’s next?

As the landscape of data storage solutions evolves rapidly, we are eager to test and integrate new features like Record Level Indexing and the creation of Pre Join tables. This evolution extends beyond the Hudi community to other table formats such as IceBerg and DeltaLake. We remain ready to adapt ourselves to these changes and incorporate the advantages of each format into our data lake within Grab.

References

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Build real-time video and audio apps on the world’s most interconnected network

Post Syndicated from Zaid Farooqui original https://blog.cloudflare.com/announcing-cloudflare-calls/

Build real-time video and audio apps on the world’s most interconnected network

Build real-time video and audio apps on the world’s most interconnected network

In the last two years, there has been a rapid rise in real-time apps that help groups of people get together virtually with near-zero latency. User expectations have also increased: your users expect real-time video and audio features to work flawlessly. We found that developers building real-time apps want to spend less time building and maintaining low-level infrastructure. Developers also told us they want to spend more time building features that truly make their idea special.

So today, we are announcing a new product that lets developers build real-time audio/video apps. Cloudflare Calls exposes a set of APIs that allows you to build things like:

  • A video conferencing app with a custom UI
  • An interactive conversation where the moderators can invite select audience members “on stage” as speakers
  • A privacy-first group workout app where only the instructor can view all the participants while the participants can only view the instructor
  • Remote ‘fireside chats’ where one or multiple people can have a video call with an audience of 10,000+ people in real time (<100ms delay)

The protocol that makes all these possible is WebRTC. And Cloudflare Calls is the product that abstracts away the complexity by turning the Cloudflare network into a “super peer,” helping you build reliable and secure real-time experiences.

What is WebRTC?

WebRTC is a peer-to-peer protocol that enables two or more users’ devices to talk to each other directly and without leaving the browser. In a native implementation, peer-to-peer typically works well for 1:1 calls with only two participants. But as you add additional participants, it is common for participants to experience reliability issues, including video freezes and participants getting out of sync. Why? Because as the number of participants increases, the coordination overhead between users’ devices also increases. Each participant needs to send media to each other participant, increasing the data consumption from each computer exponentially.

A selective forwarding unit (SFU) solves this problem. An SFU is a system that connects users with each other in real-time apps by intelligently managing and routing video and audio data between the participants. Apps that use an SFU reduce the data capacity required from each user because each user doesn’t have to send data to every other user. SFUs are required parts of a real-time application when the applications need to determine who is currently speaking or when they want to send appropriate resolution video when WebRTC simulcast is used.

Beyond SFUs

The centralized nature of an SFU is also its weakness. A centralized WebRTC server needs a region, which means that it will be slow in most parts of the world for most users while being fast for only a few select regions.

Typically, SFUs are built on public clouds. They consume a lot of bandwidth by both receiving and sending high resolution media to many devices. And they come with significant devops overhead requiring your team to manually configure regions and scalability.

We realized that merely offering an SFU-as-a-service wouldn’t solve the problem of cost and bandwidth efficiency.

Biggest WebRTC server in the world

When you are on a five-person video call powered by a classic WebRTC implementation, each person’s device talks directly with each other. In WebRTC parlance, each of the five participants is called a peer. And the reliability of the five-person call will only be as good as the reliability of the person (or peer) with the weakest Internet connection.

We built Calls with a simple premise: “What if Cloudflare could act as a WebRTC peer?”.  Calls is a “super peer” or a “giant server that spans the whole world” allows applications to be built beyond the limitations of the lowest common denominator peer or a centralized SFU. Developers can focus on the strength of their app instead of trying to compensate for the weaknesses of the weakest peer in a p2p topology.

Calls does not use the traditional SFU topology where every participant connects to a centralized server in a single location. Instead, each participant connects to their local Cloudflare data center. When another participant wants to retrieve that media, the datacenter that homes that original media stream is found and the tracks are forwarded between datacenters automatically. If two participants are physically close their media does not travel around the world to a centralized region, instead they use the same datacenter, greatly reducing latency and improving reliability.

Calls is a configurable, global, regionless WebRTC server that is the size of Cloudflare’s ever-growing network. The WebRTC protocol enables peers to send and receive media tracks. When you are on a video call, your computer is typically sending two tracks: one that contains the audio of you speaking and another that contains the video stream from your camera. Calls implements the WebRTC RTCPeerConnection API across the Cloudflare Network where users can push media tracks. Calls also exposes an API where other media tracks can be requested within the same Peer Connection context.

Build real-time video and audio apps on the world’s most interconnected network

Cloudflare Calls will be a good solution if you operate your own WebRTC server such as Janus or MediaSoup. Cloudflare Calls can also replace existing deployments of Janus or MediaSoup, especially in cases where you have clients connecting globally to a single, centralized deployment.

Region: Earth

Building and maintaining your own real-time infrastructure comes with unique architecture and scaling challenges. It requires you to answer and constantly revise your answers to thorny questions such as “which regions do we support?”, “how many users do we need to justify spinning up more infrastructure in yet another cloud region?”, “how do we scale for unplanned spikes in usage?” and “how do we not lose money during low-usage hours of our infrastructure?” when you run your own WebRTC server infrastructure.

Cloudflare Calls eliminates the need to answer these questions. Calls uses anycast for every connection, so every packet is always routed to the closest Cloudflare location. It is global by nature: your users are automatically served from a location close to them. Calls scales with your use and your team doesn’t have to build its own auto-scaling logic.

Calls runs on every Cloudflare location and every single Cloudflare server. Because the Cloudflare network is within 10 milliseconds of 90% of the world’s population, it does not add any noticeable latency.

Answer “where’s the problem?”, only faster

When we talk to customers with existing WebRTC workloads, there is one consistent theme: customers wish it was easier to troubleshoot issues. When a group of people are talking over a video call, the stakes are much higher when users experience issues. When a web page fails to load, it is common for users to simply retry after a few minutes. When a video call is disruptive, it is often the end of the call.

Cloudflare Calls’ focus on observability will help customers get to the bottom of the issues faster. Because Calls is built on Cloudflare’s infrastructure, we have end-to-end visibility from all layers of the OSI model.

Calls provides a server side view of the WebRTC Statistics API, so you can drill into issues each Peer Connection and the flow of media within without depending only on data sent from clients. We chose this because the Statistics API is a standardized place developers are used to getting information about their experience. It is the same API available in browsers, and you might already be using it today to gain insight into the performance of your WebRTC connections.

Privacy and security at the core

Calls eliminates the need for participants to share information such as their IP address with each other. Let’s say you are building an app that connects therapists and patients via video calls. With a traditional WebRTC implementation, both the patient and therapist’s devices would talk directly with each other, leading to exposure of potentially sensitive data such as the IP address. Exposure of information such as the IP address can leave your users vulnerable to denial-of-service attacks.

When using Calls, you are still using WebRTC, but the individual participants are connecting to the Cloudflare network. If four people are on a video call powered by Cloudflare Calls, each of the four participants’ devices will be talking only with the Cloudflare network. To your end users, the experience will feel just like a peer-to-peer call, only with added security and privacy upside.

Finally, all video and audio traffic that passes through Cloudflare Calls is encrypted by default. Calls leverages existing Cloudflare products including Argo to route the video and audio content in a secure and efficient manner. Calls API enables granular controls that cannot be implemented with vanilla WebRTC alone. When you build using Calls, you are only limited by your imagination; not the technology.

What’s next

We’re releasing Cloudflare Calls in closed beta today. To try out Cloudflare Calls, request an invitation and check your inbox in coming weeks.
Calls will be free during the beta period. We’re looking to work with early customers who want to take Calls from beta to generally available with us. If you are building a real-time video app today, having challenges scaling traditional WebRTC infrastructure, or just have a great idea you want to explore, leave a comment when you are requesting an invitation, and we’ll reach out.

How Kafka Connect helps move data seamlessly

Post Syndicated from Grab Tech original https://engineering.grab.com/kafka-connect

Grab’s real-time data platform team a.k.a. Coban has written about Plumbing at scale, Optimally scaling Kakfa consumer applications, and Exposing Kafka via VPCE. In this article, we will cover the importance of being able to easily move data in and out of Kafka in a low-code way and how we achieved this with Kafka Connect.

To build a NoOps managed streaming platform in Grab, the Coban team has:

  • Engineered an ecosystem on top of Apache Kafka.
  • Successfully adopted it to production for both transactional and analytical use cases.
  • Made it a battle-tested industrial-standard platform.

In 2021, the Coban team embarked on a new journey (Kafka Connect) that enables and empowers Grabbers to move data in and out of Apache Kafka seamlessly and conveniently.

Kafka Connect stack in Grab

This is what Coban’s Kafka Connect stack looks like today. Multiple data sources and data sinks, such as MySQL, S3 and Azure Data Explorer, have already been supported and productionised.

Kafka Connect stack in Grab

The Coban team has been using Protobuf as the serialisation-deserialisation (SerDes) format in Kafka. Therefore, the role of Confluent schema registry (shown at the top of the figure) is crucial to the Kafka Connect ecosystem, as it serves as the building block for conversions such as Protobuf-to-Avro, Protobuf-to-JSON and Protobuf-to-Parquet.

What problems are we trying to solve?

Problem 1: Change Data Capture (CDC)

In a big organisation like Grab, we handle large volumes of data and changes across many services on a daily basis, so it is important for these changes to be reflected in real time.

In addition, there are other technical challenges to be addressed:

  1. As shown in the figure below, data is written twice in the code base – once into the database (DB) and once as a message into Kafka. In order for the data in the DB and Kafka to be consistent, the two writes have to be atomic in a two-phase commit protocol (or other atomic commitment protocols), which is non-trivial and impacts availability.
  2. Some use cases require data both before and after a change.
Change Data Capture flow

Problem 2: Message mirroring for disaster recovery

The Coban team has done some research on Kafka MirrorMaker, an open-source solution. While it can ensure better data consistency, it takes significant effort to adopt it onto existing Kubernetes infrastructure hosted by the Coban team and achieve high availability.

Another major challenge that the Coban team faces is offset mirroring and translation, which is a known challenge in Kafka communities. In order for Kafka consumers to seamlessly resume their work with a backup Kafka after a disaster, we need to cater for offset translation.

Data ingestion into Azure Event Hubs

Azure Event Hubs has a Kafka-compatible interface and natively supports JSON and Avro schema. The Coban team uses Protobuf as the SerDes framework, which is not supported by Azure Event Hubs. It means that conversions have to be done for message ingestion into Azure Event Hubs.

Solution

To tackle these problems, the Coban team has picked Kafka Connect because:

  1. It is an open-source framework with a relatively big community that we can consult if we run into issues.
  2. It has the ability to plug in transformations and custom conversion logic.

Let us see how Kafka Connect can be used to resolve the previously mentioned problems.

Kafka Connect with Debezium connectors

Debezium is a framework built for capturing data changes on top of Apache Kafka and the Kafka Connect framework. It provides a series of connectors for various databases, such as MySQL, MongoDB and Cassandra.

Here are the benefits of MySQL binlog streams:

  1. They not only provide changes on data, but also give snapshots of data before and after a specific change.
  2. Some producers no longer have to push a message to Kafka after writing a row to a MySQL database. With Debezium connectors, services can choose not to deal with Kafka and only handle MySQL data stores.

Architecture

Kafka Connect architecture

In case of DB upgrades and outages

DB Data Definition Language (DDL) changes, migrations, splits and outages are common in database operations, and each operation type has a systematic resolution.

The Debezium connector has built-in features to handle DDL changes made by DB migration tools, such as pt-online-schema-change, which is used by the Grab DB Ops team.

To deal with MySQL instance changes and database splits, the Coban team leverages on the Kafka Connect framework’s ability to change the offsets of connectors. By changing the offsets, Debezium connectors can properly function after DB migrations and resume binlog synchronisation from any position in any binlog file on a MySQL instance.

Database upgrades and outages

Refer to the Debezium documentation for more details.

Success stories

The CDC project on MySQL via Debezium connectors has been greatly successful in Grab. One of the biggest examples is its adoption in the Elasticsearch optimisation carried out by GrabFood, which has been published in another blog.

MirrorMaker2 with offset translation

Kafka MirrorMaker2 (MM2), developed in and shipped together with the Apache Kafka project, is a utility to mirror messages and consumer offsets. However, in the Coban team, the MM2 stack is deployed on the Kafka Connect framework per connector because:

  1. A few Kafka Connect clusters have already been provisioned.
  2. Compared to launching three connectors bundled in MM2, Coban can have finer controls on MirrorSourceConnector and MirrorCheckpointConnector, and manage both of them in an infrastructure-as-code way via Hashicorp Terraform.
MirrorMaker2 flow

Success stories

Ensuring business continuity is a key priority for Grab and this includes the ability to recover from incidents quickly. In 2021H2, there was a campaign that ran across many teams to examine the readiness and robustness of various services and middlewares. Coban’s Kafka is one of these services that proved to be robust after rounds of chaos engineering. With MM2 on Kafka Connect to mirror both messages and consumer offsets, critical services and pipelines could safely be replicated and launched across AWS regions if outages occur.

Because the Coban team has proven itself as the battle-tested Kafka service provider in Grab, other teams have also requested to migrate streams from self-managed Kafka clusters to ones managed by Coban. MM2 has been used in such migrations and brought zero downtime to the streams’ producers and consumers.

Mirror to Azure Event Hubs with an in-house converter

The Analytics team runs some real time ingestion and analytics projects on Azure. To support this cross-cloud use case, the Coban team has adopted MM2 for message mirroring to Azure Event Hubs.

Typically, Event Hubs only accept JSON and Avro bytes, which is incompatible with the existing SerDes framework. The Coban team has developed a custom converter that converts bytes serialised in Protobuf to JSON bytes at runtime.

These steps explain how the converter works:

  1. Deserialise bytes in Kafka to a Protobuf DynamicMessage according to a schema retrieved from the Confluent™ schema registry.
  2. Perform a recursive post-order depth-first-search on each field descriptor in the DynamicMessage.
  3. Convert every Protobuf field descriptor to a JSON node.
  4. Serialise the root JSON node to bytes.

The converter has not been open sourced yet.

Deployment

Deployment

Docker containers are the Coban team’s preferred infrastructure, especially since some production Kafka clusters are already deployed on Kubernetes. The long-term goal is to provide Kafka in a software-as-a-service (SaaS) model, which is why Kubernetes was picked. The diagram below illustrates how Kafka Connect clusters are built and deployed.

Terraform for connectors

What’s next?

The Coban team is iterating on a unified control plane to manage resources like Kafka topics, clusters and Kafka Connect. In the foreseeable future, internal users should be able to provision Kafka Connect connectors via RESTful APIs and a graphical user interface (GUI).

At the same time, the Coban team is closely working with the Data Engineering team to make Kafka Connect the preferred tool in Grab for moving data in and out of external storages (S3 and Apache Hudi).

Coban is hiring!

The Coban (Real-time Data Platform) team at Grab in Singapore is hiring software and site reliability engineers at all levels as we double down on growing our platform capabilities.

Join us in building state-of-the-art, mission critical, TB/hour scale data platforms that enable thousands of engineers, data scientists, and analysts to serve millions of consumers, businesses, and partners across Southeast Asia!

Join us

Grab is a leading superapp in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across over 400 cities in eight countries.
Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Welcome to Speed Week and a Waitless Internet

Post Syndicated from John Graham-Cumming original https://blog.cloudflare.com/fastest-internet/

Welcome to Speed Week and a Waitless Internet

Welcome to Speed Week and a Waitless Internet

No one likes to wait. Internet impatience is something we all suffer from.

Waiting for an app to update to show when your lunch is arriving; a website that loads slowly on your phone; a movie that hasn’t started to play… yet.

But building a waitless Internet is hard. And that’s where Cloudflare comes in. We’ve built the global network for Internet applications, be they websites, IoT devices or mobile apps. And we’ve optimized it to cut the wait.

If you believe ISP advertising then you’d think that bandwidth (100Mbps! 1Gbps! 2Gbps!) is the be all and end all of Internet speed. That’s a small component of what it takes to deliver the always on, instant experience we want and need.

The reality is you need three things: ample bandwidth, to have content and applications close to the end user, and to make the software as fast as possible. Simple really. Except not, because all three things require a lot of work at different layers.

In this blog post I’ll look at the factors that go into building our fast global network: bandwidth, latency, reliability, caching, cryptography, DNS, preloading, cold starts, and more; and how Cloudflare zeroes in on the most powerful number there is: zero.

I will focus on what happens when you visit a website but most of what I say below applies to the fitness tracker on your wrist sending information up to the cloud, your smart doorbell alerting you to a visitor, or an app getting you the weather forecast.

Faster than the speed of sight

Imagine for a moment you are about to type in the name of a website on your phone or computer. You’ve heard about an exciting new game “Silent Space Marine” and type in silentspacemarine.com.

The very first thing your computer does is translate that name into an IP address. Since computers do absolutely everything with numbers under the hood this “DNS lookup” is the first necessary step.

It involves your computer asking a recursive DNS resolver for the IP address of silentspacemarine.com. That’s the first opportunity for slowness. If the lookup is slow everything else will be slowed down because nothing can start until the IP address is known.

The DNS resolver you use might be one provided by your ISP, or you might have changed it to one of the free public resolvers like Google’s 8.8.8.8. Cloudflare runs the world’s fastest DNS resolver, 1.1.1.1, and you can use it too. Instructions are here.

With fast DNS name resolution set up your computer can move on to the next step in getting the web page you asked for.

Aside: how fast is fast? One way to think about that is to ask yourself how fast you are able to perceive something change. Research says that the eye can make sense of an image in 13ms. High quality video shows at 60 frames per second (about 16ms per image). So the eye is fast!

What that means for the web is that we need to be working in tens of milliseconds not seconds otherwise users will start to see the slowness.

Slowly, desperately slowly it seemed to us as we watched

Why is Cloudflare’s 1.1.1.1 so fast? Not to downplay the work of the engineering team who wrote the DNS resolver software and made it fast, but two things help make it zoom: caching and closeness.

Caching means keeping a copy of data that hasn’t changed, so you don’t have to go ask for it. If lots of people are playing Silent Space Marine then a DNS resolver can keep its IP address in cache so that when a computer asks for the IP address the software can reply instantly. All good DNS resolvers cache information for speed.

But what happens if the IP address isn’t in the resolver’s cache. This happens the first time someone asks for it, or after a timeout period where the resolver needs to check that the IP address hasn’t changed. In order to get the IP address the resolver asks an authoritative DNS server for the information. That server is ‘authoritative’ for a specific domain (like silentspacemarine.com) and knows the correct IP address.

Since DNS resolvers sometimes have to ask authoritative servers for IP addresses it’s also important that those servers are fast too. That’s one reason why Cloudflare runs one of the world’s largest and fastest authoritative DNS services. Slow authoritative DNS could be another reason an end user has to wait.

So much for caching, what about ‘closeness’. Here’s the problem: the speed of light is really slow. Yes, I know everyone tells you that the speed of light is really fast, but that’s because us sentient water-filled carbon lifeforms can’t move very fast.

But electrons shooting through wires, and lasers blasting data down fiber optic cables, send data at or close to light speed. And sadly light speed is slow. And this slowness shows up because in order to get anything on the Internet you need to go back and forth to a server (many, many times).

In the best case of asking for silentspacemarine.com and getting its IP address there’s one roundtrip:

“Hello, can you tell me the address of silentspacemarine.com?”
“Yes, it’s…”

Even if you made the DNS resolver software instantaneous you’d pay the price of the speed of light. Sounds crazy, right? Here’s a quick calculation. Let’s imagine at home I have fiber optic Internet and the nearest DNS resolver to me is the city 100 km’s away. And somehow my ISP has laid the straightest fiber cable from me to the DNS resolver.

The speed of light in fiber is roughly 200,000,000 meters per second. Round trip would be 200,000 meters and so in the best possible case a whole one ms has been eaten up by the speed of light. Now imagine any worse case and the speed of light starts eating into the speed of sight.

The solution is quite simple: move the DNS resolver as close to the end user as possible. That’s partly why Cloudflare has built out (and continues to grow) our network. Today it stands at 250 cities worldwide.

Aside: actually it’s not “quite simple” because there’s another wrinkle. You can put servers all over the globe, but you also have to hook them up to the Internet. The beauty of the Internet is that it’s a network of networks. That also means that you don’t just plug into the Internet and get the lowest latency, you need to connect to multiple ISPs, transit networks and more so that end users, whatever network they use, get the best waitless experience they want.

That’s one reason why Cloudflare’s network isn’t simply all over the world, it’s also one of the most interconnected networks.

So far, in building the waitless Internet, we’ve identified fast DNS resolvers and fast authoritative DNS as two needs. What’s next?

Hello. Hello. OK.

So your web browser knows the IP address of Silent Space Marine and got it quickly. Great. Next step is for it to ask the web server at that IP address for the web page. Not so fast! The first step is to establish a connection to that server.

This is almost always done using a protocol called TCP that was invented in the 1970s. The very first step is for your computer and the server to agree they want to communicate. This is done with something called a three-way handshake.

Your computer sends a message saying, essentially, “Hello”, the server replies “I heard you say Hello” (that’s one round trip) and then your computer replies “I heard you say you heard me say Hello, so now we can chat” (actually it’s SYN then SYN-ACK and then ACK).

So, at least one speed-of-light troubled round trip has occurred. How do we fight the speed of light? We bring the server (in this case, web server) close to the end user. Yet another reason for Cloudflare’s massive global network and high interconnectedness.

Now the web browser can ask the web server for the web page of Silent Space Marine, right? Actually, no. The problem is we don’t just need a fast Internet we also need one that’s secure and so pretty much everything on the Internet uses an encryption protocol called TLS (which some old-timers will call SSL) and so next a secure connection has to be established.

Aside: astute readers might be wondering why I didn’t mention security in the DNS section above. Yep, you’re right, that’s a whole other wrinkle. DNS also needs to be secure (and fast) and resolvers like 1.1.1.1 support the encrypted DNS standards DoH and DoT. Those are built on top of… TLS. So in order to have fast, secure DNS you need the same thing as fast, secure web, and that’s fast TLS.

Oh, and by the way, you don’t want to get into some silly trade off between security and speed. You need both, which is why it’s helpful to use a service provider, like Cloudflare, that does everything.

Is this line secure?

TLS is quite a complicated protocol involving a web browser and a server establishing encryption keys and at least one of them (typically the web server) providing that they are who they purport to be (you wouldn’t want a secure connection to your bank’s website if you couldn’t be sure it was actually your bank).

The back and forth of establishing the secure connection incurs more hits on the speed of light. And so, once again, having servers close to end users is vital. And having really fast encryption software is vital too. Especially since encryption will need to happen on a variety of devices (think an old phone vs. a brand new laptop).

So, staying on top of the latest TLS standard is vital (we’re currently on TLS 1.3), and implementing all the tricks that speed TLS up is important (such as session resumption and 0-RTT resumption), and making sure your software is highly optimized.

So far getting to a waitless Internet has involved fast DNS resolvers, fast authoritative DNS, being close to end users to fast TCP handshakes, optimized TLS using the latest protocols. And we haven’t even asked the web server for the page yet.

If you’ve been counting round trips we’re currently standing at four: one for DNS, one for TCP, two for TLS. Lots of opportunity for the speed of light to be a problem, but also lots of opportunity for wider Internet problems to cause a slow-down.

Skybird, this is Dropkick with a red dash alpha message in two parts

Actually, before we let the web browser finally ask for the web page there are two things we need to worry about. And both are to do with when things go wrong. You may have noticed that sometimes the Internet doesn’t work right. Sometimes it’s slow.

The slowness is usually caused by two things: congestion and packet loss. Dealing with those is also vital to giving the end user the fastest experience possible.

In ancient times, long before the dawn of history, people used to use telephones that had physical wires connected to them. Those wires connected to exchanges and literal electrical connections were made between two phones over long distances. That scaled pretty well for a long time until a bunch of packet heads came along in the 1960s and said “you know you could create a giant shared network and break all communication up into packets and share the network”. The Internet.

But when you share something you can also get congestion and congestion control is a huge part of ensuring that the Internet is shared equitably amongst users. It’s one of the miracles of the Internet that theory done in the 1970s and implemented in the 1980s has allowed the network to support real time gaming and streaming video while allowing simultaneous chat and web browsing.

The flip side of congestion control is that in order to prevent a user from overwhelming the network you have to slow them down. And we’re trying to be as fast as possible! Actually, we need to be as fast as possible while remaining fair.

And congestion control is closely related to packet loss because one way that servers and browsers and computers know that there’s congestion is when their packets get lost.

We stay on top of the latest congestion control algorithms (such as BBR) so that users get the fastest, fairest possible experience. And we do something else: we actively try to work around packet loss.

Technologies like Argo and our private fiber backbone help us route around bad Internet weather that’s causing packet loss and send connections over dedicated fiber optic links that span the globe.

More on that in the coming week.

It’s happening!

And so, finally your web browser asks the web server for the web page with an innocent looking GET / command. And the web server responds with a big blob of HTML and just when you thought things were going to be simple, they are super complicated.

The complexity comes from two places: the HTTP protocol is now on its third major version, and the content of web pages is under the control of the designer.

First, protocols. HTTP/2 and HTTP/3 both provide significant speedups for web sites by introducing parallel request/response handling, better compression and ways to work around congestion and packet loss. Although HTTP/1.1 is still widely used, these newer protocols are the majority of traffic.

Cloudflare Radar shows HTTP/1.1 has dropped into the 20% range globally.

Welcome to Speed Week and a Waitless Internet

As people upgrade to recent browsers on their computers and devices the new protocols become more and more important. Staying on top of these, and optimizing them is vital as part of the waitless Internet.

And then comes the content of web pages. Images are a vital part of the web and delivering optimized images right-sized and right-formatted for the end user device plays a big part in a fast web.

But before the web browser can start loading the images it has to get and understand the HTML of the web page. This is wasteful as the browser could be downloading images (and other assets like fonts of JavaScript) while still processing the HTML if it knew about them in advance. The solution to that is for the web server to send a hint about what’s needed along with the HTML.

More on that in the coming week.

Imagical

One of the largest categories of content we deliver for our customers consists of static and animated images. And they are also a ripe target for optimization. Images tend to be large and take a while to download and there are a vast variety of end user devices. So getting the right size and format image to the end user really helps with performance.

Getting it there at the right time also means that images can be loaded lazily and only when the user scrolls them into visibility.

But, traditionally, handling different image formats (especially as new ones like WebP and AVIF get invented), different device types (think of all the different screen sizes out there), and different compression schemes has been a mess of services.

And chained services for different aspects of the image pipeline can be slow and expensive. What you really want is simple storage and an integrated way to deliver the right image to the end user tailored just for them.

More on that in the coming week.

Cache me if you can

As I mentioned in the section about DNS, a few thousand words ago, caching is really powerful and caching content near the end user is super powerful. Cloudflare makes extensive use of caching (particularly of images but also things like GraphQL) on its servers. This makes our customers’ websites fast as images can be delivered quickly from servers near the end user.

But it introduces a problem. If you have a lot of servers around the world then the caches need to be filled with content in order for it to be ready for end users. And the more servers you add the harder it gets to keep them all filled. You want the ‘cache hit ratio’ (how often content is served from cache without having to go back to the customer’s server) to be as high as possible.

But if you’ve got the content cached in Casablanca, and a user visits your website in Chennai they won’t have the fastest content delivery. To solve this some service providers make a deliberate decision not to have lots of servers near end users.

Sounds a bit crazy but their logic is “it’s hard to keep all those caches filled in lots of cities, let’s have only a few cities”. Sad. We think smart software can solve that problem and allow you to have your cache and eat it. We’ve used smart software to solve global load balancing problems and are doing the same for global cache. That way we get high cache hit ratios, super low latency to end users and low load on customer web servers.

More on that in the coming week.

Zero Cool

You know what’s cooler than a millisecond? Zero milliseconds.

Back in 2017 Cloudflare launched Workers, our serverless/edge computing platform. Four years on Workers is widely used and entire companies are being built on the technology. We added support for a variety of languages (such as COBOL and Rust), a distributed key-value store, Durable Objects, WebSockets, Cron Triggers and more.

But people were often concerned about cold start times because they were thinking about other serverless platforms that had significant spool up times for code that wasn’t ready to run.

Last year we announced that we eliminated cold starts from Workers. You don’t have to worry. And we’ll go deeper into why Cloudflare Workers is the fastest serverless platform out there.

More on that in the coming week.

And finally…

If you run a large global network and want to know if it’s really the fastest there is, and where you need to do work to keep it fast, the only way is to measure. Although there are third-party measurement tools available they can suffer from biases and their methodology is sometimes unclear.

We decided the only way we could understand our performance vs. other networks was to build our own like-for-like testing tool and measure performance across the Internet’s 70,000+ networks.

We’ll also talk about how we keep everything fast, from lightning quick configuration updates and code deploys to logs you don’t have to wait for to ludicrously fast cache purges to real time analytics.

More on that in the coming week.

Welcome to Speed Week*

*Can’t wait for tomorrow? Go play Silent Space Marine. It uses the technologies mentioned above.

Open Sourcing Bullet, Yahoo’s Forward-Looking Query Engine for Streaming Data

Post Syndicated from mikesefanov original https://yahooeng.tumblr.com/post/161855616651

image

By Michael Natkovich, Akshai Sarma, Nathan Speidel, Marcus Svedman, and Cat Utah

Big Data is no longer just Apache server logs. Nowadays, the data may be user engagement data, performance metrics, IoT (Internet of Things) data, or something else completely atypical. Regardless of the size of the data, or the type of querying patterns on it (exploratory, ad-hoc, periodic, long-term, etc.), everyone wants queries to be as fast as possible and cheap to run in terms of resources. Data can be broadly split into two kinds: the streaming (generally real-time) kind or the batched-up-over-a-time-interval (e.g., hourly or daily) kind. The batch version is typically easier to query since it is stored somewhere like a data warehouse that has nice SQL-like interfaces or an easy to use UI provided by tools such as Tableau, Looker, or Superset. Running arbitrary queries on streaming data quickly and cheaply though, is generally much harder… until now. Today, we are pleased to share our newly open sourced, forward-looking general purpose query engine, called Bullet, with the community on GitHub.

With Bullet, you can: 

  • Powerful and nested filtering
  • Fetching raw data records
  • Aggregating data using Group Bys (Sum, Count, Average, etc.), Count Distincts, Top Ks
  • Getting distributions of fields like Percentiles or Frequency histograms 

One of the key differences between how Bullet queries data and the standard querying paradigm is that Bullet does not store any data. In most other systems where you have a persistence layer (including in-memory storage), you are doing a look-back when you query the layer. Instead, Bullet operates on data flowing through the system after the query is started – it’s a look-forward system that doesn’t need persistence. On a real-time data stream, this means that Bullet is querying data after the query is submitted. This also means that Bullet does not query any data that has already passed through the stream. The fact that Bullet does not rely on a persistence layer is exactly what makes it extremely lightweight and cheap to run. 

To see why this is better for the kinds of use cases Bullet is meant for – such as quickly looking at some metric, checking some assumption, iterating on a query, checking the status of something right now, etc. – consider the following: if you had a 1000 queries in a traditional query system that operated on the same data, these query systems would most likely scan the data 1000 times each. By the very virtue of it being forward looking, 1000 queries in Bullet scan the data only once because the arrival of the query determines and fixes the data that it will see. Essentially, the data is coming to the queries instead of the queries being farmed out to where the data is. When the conditions of the query are satisfied (usually a time window or a number of events), the query terminates and returns you the result. 

A Brief Architecture Overview

image

High Level Bullet Architecture

The Bullet architecture is multi-tenant, can scale linearly for more queries and/or more data, and has been tested to handle 700+ simultaneous queries on a data stream that had up to 1.5 million records per second, or 5-6 GB/s. Bullet is currently implemented on top of Storm and can be extended to support other stream processing engines as well, like Spark Streaming or Flink. Bullet is pluggable, so you can plug in any source of data that can be read in Storm by implementing a simple data container interface to let Bullet work with it. 

The UI, web service, and the backend layers constitute your standard three-tier architecture. The Bullet backend can be split into three main subsystems:

  1. Request Processor – receives queries, adds metadata, and sends it to the rest of the system
  2. Data Processor – reads data from an input stream, converts it to a unified data format, and matches it against queries
  3. Combiner – combines results for different queries, performs final aggregations, and returns results 

The web service can be deployed on any servlet container, like Jetty. The UI is a Node-based Ember application that runs in the client browser. Our full documentation contains all the details on exactly how we perform computationally-intractable queries like Count Distincts on fields with cardinality in the millions, etc. (DataSketches). 

Usage at Yahoo 

An instance of Bullet is currently running at Yahoo in production against a small subset of Yahoo’s user engagement data stream. This data is roughly 100,000 records per second and is about 130 MB/s compressed. Bullet queries this with about 100 CPU Virtual Cores and 120 GB of RAM. This fits on less than 2 of our (64 Virtual Cores, 256 GB RAM each) test Storm cluster machines. 

One of the most popular use cases at Yahoo is to use Bullet to manually validate the instrumentation of an app or web application. Instrumentation produces user engagement data like clicks, views, swipes, etc. Since this data powers everything we do from analytics to personalization to targeting, it is absolutely critical that the data is correct. The usage pattern is generally to: 

  1. Submit a Bullet query to obtain data associated with your mobile device or browser (filter on a cookie value or mobile device ID)
  2. Open and use the application to generate the data while the Bullet query is running
  3. Go back to Bullet and inspect the data 

In addition, Bullet is also used programmatically in continuous delivery pipelines for functional testing instrumentation on product releases. Product usage is simulated, then data is generated and validated in seconds using Bullet. Bullet is orders of magnitude faster to use for this kind of validation and for general data exploration use cases, as opposed to waiting for the data to be available in Hive or other systems. The Bullet UI supports pivot tables and a multitude of charting options that may speed up analysis further compared to other querying options. 

We also use Bullet to do a bunch of other interesting things, including instances where we dynamically compute cardinalities (using a Count Distinct Bullet query) of fields as a check to protect systems that can’t support extremely high cardinalities for fields like Druid. 

What you do with Bullet is entirely determined by the data you put it on. If you put it on data that is essentially some set of performance metrics (data center statistics for example), you could be running a lot of queries that find the 95th and 99th percentile of a metric. If you put it on user engagement data, you could be validating instrumentation and mostly looking at raw data. 

We hope you will find Bullet interesting and tell us how you use it. If you find something you want to change, improve, or fix, your contributions and ideas are always welcome! You can contact us here

Helpful Links