Future of Ubuntu MATE

Post Syndicated from jzb original https://lwn.net/Articles/1077221/

Thomas Ward has published
an update about the future of the Ubuntu MATE project, which did not have a
26.04 release with the other Ubuntu flavors in
April:

There is a new team working on Ubuntu MATE who have stepped up to
help take over flavor management. They haven’t formally introduced
themselves yet, but I can safely say that other developers HAVE
stepped up for the future of the MATE flavor, despite its prior team
lead having stepped down.

[…] Ultimately, this means that they are working to cover the
missed items and gaps, and may quite possibly have a 26.10 release in
October of 2026, which I believe they most likely are targeting.

This also means that bugs in the MATE environment and in packages
they normally would have shipped had they have a 26.04 release are
still going to get attention and fixes. So, effectively, nothing has
changed. The only difference is that there was no 26.04 installer
image released.

For those looking to install a MATE desktop on a “clean” install of
Ubuntu 26.04, Ward suggests installing Ubuntu Server and then
installing the ubuntu-mate-desktop package.

[$] Eliminating long-lived credentials with trusted publishing

Post Syndicated from jzb original https://lwn.net/Articles/1076205/

Trusted
publishing
is an authentication mechanism that relies on
short-lived credentials to reduce the risk of supply-chain attacks. At
the 2026 Open
Source Summit North America
, Mike Fiedler walked the audience
through why trusted publishing exists, how it works, and made the case
for its adoption. It is not a silver bullet against all attacks, but
it does offer protection against theft of long-lived credentials used
to publish to package registries.

Anthropic Claude Fable 5 on AWS: Mythos-class capabilities with built-in safeguards now available

Post Syndicated from Channy Yun (윤석찬) original https://aws.amazon.com/blogs/aws/anthropic-claude-fable-5-on-aws-mythos-class-capabilities-with-built-in-safeguards-now-available/

Today, we’re announcing the availability of Claude Fable 5 on Amazon Bedrock and Claude Platform on AWS. Claude Fable 5 makes Mythos-level capabilities available to customers, with strong safeguards designed to make it safe for broader use. Fable 5 is state-of-the-art on nearly all tested benchmarks and delivers exceptional performance in software engineering, knowledge work tasks, and vision – built for ambitious, long running work.

With Claude Fable 5 on Bedrock, you can build within your existing AWS environment and scale inference workloads. You can also use Claude Fable 5 through the Claude Platform on AWS, giving you Anthropic’s native platform experience.

According to Anthropic, Claude Fable 5 represents a step-change in what you can accomplish with AI models. Here is what makes this model different:

  • Long-running, asynchronous execution — Claude Fable 5 handles complex tasks that previous models could not sustain, executing coding and knowledge work tasks for extended periods without intervention.
  • Advanced vision capabilities — Claude Fable 5 understands diagrams, charts, and tables nested in files and PDFs. This opens up research and document-heavy work in finance, legal, analytics, architecture, and gaming. In coding, the model implements designs with high fidelity and uses vision to critique its output against goals.
  • Proactive self-verification — The model self-updates skills based on learnings, develops its own harnesses and evaluations.

Claude Fable 5 includes safeguards that limit its performance in specific areas where misuse risk is elevated. Harmful prompts related to cybersecurity, biology, chemistry, and health fall back to receive a response from Opus 4.8 instead. Anthropic is able to expand access to nearly all of Claude Fable 5’s state-of-the-art capabilities by developing more powerful safeguards. The same model without these limits is Claude Mythos 5 and it will only be available to a small group of vetted customers.

Claude Fable 5 model in action
You can use Claude Fable 5 in both Amazon Bedrock and Claude Platform on AWS. This post will cover guidance on how to access and use on Amazon Bedrock. For guidance on the Claude Platform on AWS, visit the documentation to learn more.

To get started with Amazon Bedrock, you can access the model programmatically now using the Anthropic Messages API to call the bedrock-runtime or bedrock-mantle endpoints through Anthropic SDK. You can sole keep using the Invoke and Converse API on bedrock-runtime through the AWS Command Line Interface (AWS CLI) and AWS SDK.

In order to access Claude Fable 5 model, you must opt into data sharing by using the Data Retention API and setting provider_data_sharing before you can invoke the models. There is no console user interface for this setting at launch.

curl -X PUT https://bedrock-mantle.us-east-1.api.aws/v1/data_retention \
  -H "x-api-key: <your-bedrock-api-key>" \ 
  -H "Content-Type: application/json" \
  -d '{ "mode": "provider_data_share" }'

This mode allows Amazon Bedrock to retain and share your inference data with model providers per their requirements. Anthropic requires 30-day inputs and outputs retention, as well as human review. To learn more, visit the Amazon Bedrock abuse detection.

Let’s start with Anthropic SDK for Python using the Messages API on bedrock-mantle endpoint. Install Anthropic SDK.

pip install anthropic

Here is a sample Python code to call Claude Fable 5 model:

import anthropic

client = anthropic.Anthropic(
    base_url="https://bedrock-mantle.us-east-1.api.aws/anthropic",
    api_key= <your-bedrock-api-key>
)

message = client.messages.create( 
     model="anthropic.claude-fable-5", 
	 max_tokens=4096, 
	 messages=[ 
	     { "role": "user", 
		   "content": "Design a distributed architecture on AWS in Python that should support 100k requests per second across multiple geographic regions", 
		 }, 
	 ], 
)

print(message.content[0].text)

To learn more, check out Anthropic Messages API code examples and notebook examples for multiple use cases and a variety of programming languages.

You can also use Claude Fable 5 with the Invoke API and Converse API on bedrock-runtime endpoint. Here’s a example to call Converse API for a unified multi-model experience using the AWS SDK for Python (Boto3):

import boto3 
bedrock_runtime = boto3.client("bedrock-runtime", region_name="us-east-1") 
response = bedrock_runtime.converse( 
    modelId="us.anthropic.claude-fable-5", 
    messages=[ 
        { 
            "role": "user", 
            "content": [ 
                { 
                    "text": "Design a distributed architecture on AWS in Python that should support 100k requests per second across multiple geographic regions." 
                } 
            ] 
        } 
    ], 
    inferenceConfig={ 
        "maxTokens": 4096 
    } 
) 
print(response["output"]["message"]["content"][0]["text"]) 

To learn more, visit code examples that show how to use Amazon Bedrock Runtime with AWS SDKs.

Things to know
Let me share some important technical details that I think you’ll find useful.

  • Model access — Claude Fable 5 access is gradually expanding for all AWS accounts. If your account doesn’t have access yet, it will be enabled soon depending on your Bedrock usage. If you want to get access to this model quickly, contact your usual AWS Support.
  • Pricing — When a harmful prompt is routed to Opus 4.8 instead of Fable 5, you pay only Opus prices. If a request is blocked mid-conversation, initial tokens are charged at Fable rates and subsequent tokens at Opus rates. To learn more, visit the Amazon Bedrock pricing page.
  • Data retention — For Fable 5, Mythos 5, and future models on Bedrock with similar or higher capability levels, Anthropic will require 30-day retention for all traffic on Mythos-class models. Retaining data for a limited period allows Anthropic to detect patterns of misuse that are not visible from a single exchange. Once you opt into data retention, your data will leave AWS’s data and security boundary.
  • Claude Mythos 5 on Bedrock (Limited Preview) — You can also use Anthropic’s most capable model for cybersecurity and life sciences, including vulnerability discovery, drug design, and biodefense screening. Access is currently limited due to the dual-use nature of these domains. To learn more, visit the model card documentation.

Now available
Anthropic’s Claude Fable 5 model is available today on Amazon Bedrock in the US East (N. Virginia) and Europe (Stockholm) Regions; check the full list of Regions for future updates. Claude Fable 5 is also available on the Claude Platform on AWS in North America, South America, Europe, and Asia Pacific.

Give Claude Fable 5 a try with the Amazon Bedrock APIs, in the Claude Platform on AWS, and send feedback to AWS re:Post for Amazon Bedrock or through your usual AWS Support contacts.

Channy

Updated on June 9, 2026 — You can use the console on bedrock-runtime engine. The console support on bedrock-mantle is coming soon.

Beyond JSON blobs: Implementing the VARIANT data type in Apache Iceberg V3

Post Syndicated from Arun Shanmugam original https://aws.amazon.com/blogs/big-data/beyond-json-blobs-implementing-the-variant-data-type-in-apache-iceberg-v3/

Apache Iceberg V3 introduces the VARIANT data type. VARIANT provides data engineers with a high-performance, native solution for managing semi-structured data within the data lake. Consider a massive fleet of IoT sensors: street-level temperature probes, air quality monitors, and vehicle telemetry. Each device emits data in unique JSON structures that constantly evolve with firmware updates.

Historically, engineers were forced to store these payloads as STRING blobs. This legacy approach mandates expensive CPU-intensive parsing at runtime and inflates storage costs with redundant raw text. VARIANT solves these inefficiencies by employing a shredded, binary-encoded format. This allows query engines to skip irrelevant data and access specific nested fields with columnar speed, effectively bridging the gap between the flexibility of JSON and the performance of a structured schema.

VARIANT is stored in Parquet as a three-part group: binary metadata (type and dictionary info), a binary value (the full variant for fallback), and a typed_value group where individual JSON fields are shredded into separate Parquet columns. When you query a specific field, Spark prunes the typed_value group to include only the requested sub-columns. It always retains metadata and the value fallback, so it avoids reading the entire document. This approach delivers two concrete benefits:

  • Reduced query processing time: Queries access only the fields they need without deserializing entire JSON documents. This reduces the amount of data scanned and the time spent on deserialization.
  • Lower storage footprint: Binary encoding compresses more efficiently than raw text, reducing storage costs.

Fields inside the JSON become individually accessible columns under the hood. A query that needs one value out of a deeply nested document no longer must read and deserialize the entire thing. You maintain schema flexibility while gaining the performance characteristics of structured columnar storage.

This post is part 1 of a two-part series. We walk through the basics: creating an Iceberg V3 table with a VARIANT column, inserting semi-structured data, and querying it with variant_get(). In Part 2, we scale to millions of rows and benchmark VARIANT against traditional string storage. We measure the difference in query performance and storage footprint.

Solution overview

This walkthrough demonstrates an end-to-end workflow for working with semi-structured data using the VARIANT data type in Apache Iceberg V3 on Amazon EMR Serverless. Raw JSON payloads are ingested and converted to binary VARIANT format using parse_json(). The data is stored in an Iceberg V3 table where the engine shreds the structure into columnar Parquet sub-columns. You can then query the data efficiently using variant_get() to extract specific fields without deserializing the entire document. AWS Glue Data Catalog manages the table metadata. Amazon Simple Storage Service (Amazon S3) provides the underlying storage.

Note: Check the Apache Iceberg documentation for the latest information on specification status and engine compatibility. Additionally, Fine-Grained Access Control (FGAC) through AWS Lake Formation is not currently supported for the VARIANT data type.

How VARIANT works

When you insert a JSON document into a VARIANT column, Spark converts it from a JSON string into the Variant binary format. During writes, the engine can shred the structure. It extracts individual fields and stores them as native Parquet-typed sub-columns within the VARIANT column’s typed_value group. Fields that are not shredded remain in the binary value column as a fallback. This is conceptually similar to how a columnar table stores each column independently. The difference is that the sub-columns live within a single VARIANT column, and the engine handles the shredding schema automatically.

At query time, when you ask for a specific field using variant_get(), Spark reads only the sub-column that contains that field. It does not need to load or parse the rest of the document. For workloads that repeatedly query a handful of fields out of large, complex JSON payloads, this can significantly reduce the amount of data scanned. It also reduces the time spent deserializing it.

The variant_get() function uses JSON path syntax to navigate the structure. You can extract scalar values with an explicit type (optional), access nested objects, and reach into arrays by index. The function signature is the following.

variant_get(column, '$.path.to.field', 'type')

Where column is the VARIANT column name, the second argument is a JSON path expression, and the optional third argument specifies the expected return type (such as 'string', 'int', or 'double'). When the type argument is omitted, the function returns a VARIANT value that preserves the original encoding.

Running Iceberg V3 on Amazon EMR Serverless

Amazon EMR Serverless 8.0 ships with Apache Spark 4.0.1, which includes native support for Iceberg V3 and the VARIANT data type. You do not need to install additional libraries or configure custom JARs. Amazon EMR Serverless manages the compute infrastructure and scales resources up and down based on workload demand. You can focus on the data rather than the cluster.

While this post uses Amazon EMR Serverless, Iceberg V3 VARIANT support is also available on Amazon EMR on EC2 and Amazon EMR on EKS. You can choose the deployment model that fits your environment.

Getting started

The following walkthrough creates an Iceberg V3 table with a VARIANT column, inserts a set of IoT sensor events, and runs queries to extract fields from the semi-structured payload. Each step includes the code you need to run it on Amazon EMR Serverless.

Prerequisites

Before you begin, verify you have the following:

  • An AWS account with permissions to create Amazon EMR Serverless applications and access Amazon Simple Storage Service (Amazon S3).
  • An Amazon S3 bucket for storing Iceberg table data and scripts.
  • AWS Glue Data Catalog configured for metadata management.
  • An IAM execution role with permissions for Amazon EMR Serverless, Amazon S3, AWS Glue, and Amazon CloudWatch Logs.
  • AWS Command Line Interface (AWS CLI) installed and configured.Note: Running this solution in your AWS account might incur charges for Amazon EMR Serverless, Amazon S3, and AWS Glue. Refer to the respective pricing pages for cost details.

Step 1: Initialize a Spark session with Iceberg V3

Start by creating a Spark session configured to use the Iceberg catalog backed by AWS Glue. The key settings are the Iceberg Spark extensions and the AWS Glue catalog implementation. Replace <YOUR_S3_BUCKET> with your bucket name.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, parse_json

spark = SparkSession.builder \
    .appName("IcebergV3VariantDemo") \
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.glue_catalog",
            "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse",
            "s3://<YOUR_S3_BUCKET>/warehouse/") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl",
            "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl",
            "org.apache.iceberg.aws.s3.S3FileIO") \
    .getOrCreate()

When running on Amazon EMR Serverless, some Spark configurations might be set at the application or job level. The configuration shown here is included in the script for completeness. Depending on your Amazon EMR Serverless application settings, you might not need to specify all these properties in the script.

Step 2: Create an Iceberg V3 table with a VARIANT column

Create a namespace and table. The format version must be set to 3 for VARIANT data type support. The following table models IoT sensor events with a few standard columns and a VARIANT column for the semi-structured payload.

spark.sql("CREATE NAMESPACE IF NOT EXISTS glue_catalog.iceberg_v3_demo")

spark.sql("""
CREATE TABLE IF NOT EXISTS glue_catalog.iceberg_v3_demo.sensor_events (
    event_id STRING,
    device_id STRING,
    event_timestamp TIMESTAMP,
    event_data VARIANT
)
USING iceberg
TBLPROPERTIES (
    'format-version' = '3'
)
""")

The event_data column is declared as VARIANT. Iceberg stores it in Parquet as a binary-encoded VARIANT structure (metadata, value, and optional shredded sub-columns) rather than as a plain text string.

Step 3: Insert semi-structured data

To insert JSON data into a VARIANT column, use the parse_json() function. This converts a JSON string into the binary VARIANT format at write time. The following example creates a small DataFrame of IoT events and appends them to the table.

import json
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StructField, StringType

# Sample IoT events with nested JSON payloads
events = [
    ("evt_001", "sensor_001", json.dumps({
        "device": {"manufacturer": "SensorTech", "model": "ST-200",
                   "firmware_version": "3.1.4"},
        "sensors": {"temperature": 22.5, "humidity": 61.3,
                    "air_quality": {"pm25": 12.4, "co2": 415}},
        "network": {"connection": "WiFi", "latency_ms": 42},
        "alerts": [{"severity": "low", "message": "Calibration due"}]
    })),
    ("evt_002", "sensor_002", json.dumps({
        "device": {"manufacturer": "IoTCorp", "model": "IC-500",
                   "firmware_version": "2.8.1"},
        "sensors": {"temperature": 34.1, "humidity": 78.9,
                    "air_quality": {"pm25": 142.7, "co2": 1850}},
        "network": {"connection": "LTE", "latency_ms": 210},
        "alerts": [{"severity": "critical",
                    "message": "Temperature threshold exceeded"},
                   {"severity": "high",
                    "message": "Poor air quality detected"}]
    })),
    ("evt_003", "sensor_003", json.dumps({
        "device": {"manufacturer": "SmartDevices", "model": "SD-100",
                   "firmware_version": "1.5.9"},
        "sensors": {"temperature": 18.7, "humidity": 45.2,
                    "air_quality": {"pm25": 8.1, "co2": 390}},
        "network": {"connection": "Ethernet", "latency_ms": 5},
        "alerts": []
    })),
]

schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("device_id", StringType(), False),
    StructField("event_data", StringType(), False),
])

df = spark.createDataFrame(events, schema)
df = df.withColumn("event_timestamp", current_timestamp())

# Convert JSON string to VARIANT using parse_json
df = df.withColumn("event_data", parse_json(col("event_data")))

df.writeTo("glue_catalog.iceberg_v3_demo.sensor_events").append()
print("Data inserted successfully.")

The parse_json() call is the key step. It takes the raw JSON string and encodes it into the binary VARIANT format before writing to the Iceberg table.

Step 4: Query VARIANT data with variant_get()

Once the data is in the table, you can extract individual fields from the VARIANT column using variant_get(). The following queries demonstrate three common patterns: simple field extraction, deep nested access with filtering, and array element access.

The following queries are shown as raw SQL for readability. To run them in your PySpark script, wrap each query in a spark.sql() call. For example: spark.sql("SELECT ...").show().

Query 1: Simple field extraction

Extract top-level sensor readings from the payload.

SELECT
    event_id,
    device_id,
    variant_get(event_data, '$.sensors.temperature', 'double') AS temperature,
    variant_get(event_data, '$.sensors.humidity', 'double') AS humidity
FROM glue_catalog.iceberg_v3_demo.sensor_events

This query reads only the temperature and humidity sub-columns from the VARIANT data. It does not parse or load the rest of the JSON document.

Query 2: Deep nested access with filtering

Reach into nested objects and filter on a value buried inside the structure.

SELECT
    device_id,
    variant_get(event_data, '$.sensors.air_quality.pm25', 'double') AS pm25,
    variant_get(event_data, '$.sensors.air_quality.co2', 'int') AS co2_level,
    variant_get(event_data, '$.device.manufacturer', 'string') AS manufacturer
FROM glue_catalog.iceberg_v3_demo.sensor_events
WHERE variant_get(event_data, '$.sensors.air_quality.pm25', 'double') > 100.0

The WHERE clause filters directly on a nested VARIANT field. Spark evaluates the predicate against the shredded sub-column without deserializing the full payload.

Query 3: Array element access

Access elements inside a JSON array stored within the VARIANT column.

SELECT
    event_id,
    device_id,
    variant_get(event_data, '$.alerts[0].severity', 'string') AS first_alert_severity,
    variant_get(event_data, '$.alerts[0].message', 'string') AS first_alert_message
FROM glue_catalog.iceberg_v3_demo.sensor_events
WHERE variant_get(event_data, '$.alerts[0].severity', 'string') = 'critical'

Array indexing uses standard bracket notation in the JSON path. This query finds events where the first alert has critical severity and returns the alert details.

Query results showing simple field extraction, nested access with filtering, and array element access from the VARIANT column

Figure 1: Query results showing simple field extraction, nested access with filtering, and array element access from the VARIANT column.

Submitting the job to Amazon EMR Serverless

To run this on Amazon EMR Serverless, save the preceding code as a single PySpark script (for example, iceberg_v3_variant_demo.py), upload it to Amazon S3, and submit it as a job. Replace the placeholder values with your own.

Before submitting the job, make sure you have created an Amazon EMR Serverless application. For instructions, see Getting started with Amazon EMR Serverless in the Amazon EMR documentation.

# Upload script to S3
aws s3 cp iceberg_v3_variant_demo.py \
    s3://<YOUR_S3_BUCKET>/scripts/ \
    --region <REGION>

# Submit the job
aws emr-serverless start-job-run \
    --application-id <APPLICATION_ID> \
    --execution-role-arn arn:aws:iam::<ACCOUNT_ID>:role/EMRServerlessExecutionRole \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://<YOUR_S3_BUCKET>/scripts/iceberg_v3_variant_demo.py"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
            "cloudWatchLoggingConfiguration": {
                "enabled": true,
                "logGroupName": "/aws/emr-serverless/applications/<APPLICATION_ID>"
            }
        }
    }' \
    --region <REGION>

Use cases

VARIANT fits naturally into workloads where the data is semi-structured and the schema is not fully known in advance. Some use cases include the following:

  • IoT and sensor data: Device fleets produce telemetry in varying JSON formats that evolve with firmware updates. VARIANT stores these payloads without requiring a fixed schema, and queries can extract specific readings without scanning the entire document.
  • Clickstream analytics: User behavior events on websites and mobile apps carry different attributes depending on the action. Page views, clicks, form submissions, and purchases each have their own structure. VARIANT accommodates these data types in a single column.
  • Log analytics: Application logs, infrastructure metrics, and audit trails often arrive as unstructured or loosely structured JSON. VARIANT lets you ingest them as is and query specific fields on demand, without defining a schema up front.

Clean up

To avoid ongoing charges, delete the resources you created:

  • Drop the Iceberg table and namespace using Spark SQL.
    spark.sql("DROP TABLE IF EXISTS glue_catalog.iceberg_v3_demo.sensor_events")
    spark.sql("DROP NAMESPACE IF EXISTS glue_catalog.iceberg_v3_demo")

  • Stop and delete the Amazon EMR Serverless application.
    aws emr-serverless delete-application --application-id <APPLICATION_ID> --region <REGION>

  • Delete the S3 objects and bucket used for table data, scripts, and logs.
    aws s3 rm s3://<YOUR_S3_BUCKET>/warehouse/ --recursive
    aws s3 rm s3://<YOUR_S3_BUCKET>/scripts/ --recursive

Conclusion

Apache Iceberg V3’s VARIANT type provides an efficient way to store and query semi-structured data in your data lake. Columnar storage and shredding reduce storage costs, and direct field access through variant_get() removes the need to parse JSON strings at query time. On Amazon EMR Serverless, you get this capability without managing infrastructure.

In Part 2 of this series, we scale to millions of rows and benchmark VARIANT against traditional string storage. We measure query performance and storage footprint under realistic workloads.

To learn more about Apache Iceberg on AWS, see Apache Iceberg on AWS prescriptive guidance. For more information about Amazon EMR Serverless, see the Amazon EMR Serverless documentation.


About the authors

Arun Shanmugam

Arun Shanmugam

Arun is a Senior Analytics Solutions Architect at AWS, with a focus on building modern data architecture. He has been successfully delivering scalable data analytics solutions for customers across diverse industries. Outside of work, Arun is an avid outdoor enthusiast who actively engages in CrossFit, road biking, and cricket.

Suthan Phillips

Suthan Phillips

Suthan is a Senior Analytics Architect at AWS, where he helps customers design and optimize scalable, high-performance data solutions that drive business insights. He combines architectural guidance on system design and scalability with best practices to provide efficient, secure implementation across data processing and experience layers. Outside of work, Suthan enjoys swimming, hiking, and exploring the Pacific Northwest.

Ron Ortloff

Ron Ortloff

Ron Ortloff is a Principal Product Manager at AWS, where he focuses on Apache Iceberg, S3 Tables, and open data lakehouse solutions. He has over 15 years of experience building and leading data platform initiatives, including launching Azure Synapse Analytics at Microsoft and leading Iceberg and data lake strategy at Snowflake. When he’s not building data platforms, Ron can be found cheering on his favorite football and hockey teams.

Xiaoxuan Li

Xiaoxuan Li

Xiaoxuan is a Software Development Engineer at AWS, working on the performance and scalability of Apache Iceberg in large-scale data lakehouse systems. Her interests span query optimization, storage-efficient architectures, and distributed data processing. Outside of work, she explores AI systems for creative storytelling and tooling for writers and content creators.

Automate medical record digitization with Amazon Bedrock Data Automation and AWS HealthLake

Post Syndicated from Gerardo Alarcon Rivas original https://aws.amazon.com/blogs/architecture/automate-medical-record-digitization-with-amazon-bedrock-data-automation-and-aws-healthlake/

Healthcare providers manage millions of paper medical records that remain disconnected from modern clinical systems. Clinicians make decisions without full patient histories, organizations spend millions on manual data entry, and critical information stays trapped in formats that modern applications can’t read. The technical challenge is clear: how do you transform unstructured, scanned documents into standardized, interoperable health data at scale, without building custom machine learning (ML) models or hand-coding document parsers for every form type.

In this post, you learn how to build an automated, serverless pipeline that converts scanned PDF medical records into FHIR R4-compliant data using Amazon Bedrock Data Automation and AWS HealthLake. We walk through the architecture, explain how each AWS service connects to the next, show you what the pipeline looks like when it runs, and get you deployed in under 20 minutes. For advanced configuration, troubleshooting, and customization options, see the GitHub repository.

The challenge with paper medical records

Healthcare organizations face a compounding problem. Paper records don’t only create storage challenges, they create care gaps. When a patient arrives at a new facility, clinicians often proceed with incomplete information because retrieving and interpreting historical records takes too long. Manual digitization is expensive, error-prone, and doesn’t scale.

The solution requires more than scanning documents. It requires extracting structured, clinically meaningful data and storing it in a format that integrates with existing systems. That’s where Fast Healthcare Interoperability Resources (FHIR) comes in. FHIR is the healthcare industry’s standard for exchanging electronic health information.

Solution overview

This solution uses an event-driven, serverless architecture to automate the full journey from PDF upload to queryable FHIR data. No custom machine learning models or manual template configuration are required.

AWS services used:

  • Amazon Bedrock Data Automation (BDA): Extracts over 50 structured clinical fields from scanned PDFs using advanced AI capabilities, including patient demographics, diagnoses with ICD-10 codes, medications, vital signs, and lab results.
  • AWS Lambda: Two serverless functions orchestrate the pipeline: a BDA Trigger function that fires when a PDF is uploaded, and a FHIR Processor function that converts extracted JSON into FHIR R4 format.
  • Amazon Simple Storage Service (Amazon S3): Input and output buckets with event notifications drive the pipeline automatically, with no polling or scheduled jobs required.
  • AWS HealthLake: A FHIR R4-compliant, HIPAA-eligible data store that validates, indexes, and exposes data through standard FHIR API endpoints.
  • AWS CloudFormation: Provisions the entire infrastructure as code in a single automated deployment (approximately 15–20 minutes).
  • Amazon CloudWatch and AWS CloudTrail: Provide end-to-end monitoring, logging, and audit trails across all pipeline components.
  • AWS Key Management Service (AWS KMS): Encrypts AWS HealthLake data at rest using customer managed keys.

Important: This solution is a demonstration sample designed for use with synthetic data only. It’s not production-ready for real Protected Health Information (PHI) without additional HIPAA security controls. See the Security considerations section before deploying in any environment with real patient data.

Architecture

End-to-end architecture showing the event-driven pipeline from PDF upload to FHIR-compliant data storage

Figure 1: End-to-end architecture showing the event-driven pipeline from PDF upload to FHIR-compliant data storage

The pipeline runs in three phases, each building on the last.

Phase 1: Infrastructure deployment

AWS CloudFormation provisions all required resources in a single stack: Amazon S3 input and output buckets, two Lambda functions, AWS Identity and Access Management (IAM) roles with least-privilege permissions, AWS KMS keys, CloudWatch log groups, and an AWS HealthLake FHIR R4 datastore. The entire environment, including all service-to-service permissions, is version-controlled and repeatable.

Phase 2: Event-driven data processing

The processing pipeline is fully event-driven. No scheduler or orchestration service is required. Each step triggers the next automatically:

  1. PDF Upload → S3 Input Bucket
  2. S3 Event → Triggers BDA Lambda function
  3. BDA Processing → Extracts over 50 clinical fields with confidence scores
  4. JSON Storage → S3 Output Bucket
  5. S3 Event → Triggers FHIR Processor Lambda function
  6. FHIR Conversion → Creates FHIR R4 Bundle (JSON + NDJSON)
  7. HealthLake Import → Automatic NDJSON ingestion and validation
  8. FHIR API Access → Query using HealthLake endpoints

Phase 3: Query and analytics

After the data is in AWS HealthLake, it’s immediately queryable using standard FHIR R4 API endpoints. Python scripts authenticate using AWS Signature Version 4 (SigV4) and support searches by patient, condition, medication, or lab result type.

How the services connect

Understanding the service interconnections is key to customizing or extending this solution.

Amazon S3 as the pipeline backbone

Amazon S3 plays a dual role: it’s both the entry point for raw PDFs and the handoff layer between processing stages. Amazon S3 event notifications remove the need for polling. When a PDF lands in the input bucket, the BDA Lambda fires immediately. When BDA writes its JSON output to the output bucket, the FHIR Processor Lambda fires automatically. This decoupled design means that each stage can scale independently.

Amazon Bedrock Data Automation as the intelligence layer

BDA serves as the intelligence layer. When Lambda triggers the extraction job, BDA retrieves the PDF from Amazon S3 and applies a custom medical blueprint, which is a schema defining the over 50 clinical fields to extract. The service understands document structure without requiring templates or training data. Each extracted field is returned with a confidence score (0.0–1.0), which the FHIR Processor Lambda uses to apply validation thresholds before conversion.

AWS Lambda as the transformation layer

The two Lambda functions are intentionally narrow in scope:

  • The BDA Trigger Lambda receives the Amazon S3 event, constructs the BDA API call, and submits the processing job.
  • The FHIR Processor Lambda reads BDA’s JSON output, maps each extracted field to the appropriate FHIR R4 resource type, assembles a FHIR Bundle, exports it as NDJSON, and triggers an AWS HealthLake import job.

This separation of concerns makes each function independently testable and replaceable.

AWS HealthLake as the FHIR data store

AWS HealthLake receives the NDJSON import, validates each resource against the FHIR R4 specification, creates relationships between resources (for example, linking Condition resources to their Patient), indexes data for efficient querying, and generates unique FHIR resource IDs. The result is a fully queryable FHIR data store accessible through authenticated API calls.

IAM roles as the security fabric

Each service communicates with the next using IAM roles with least-privilege permissions. There are no hardcoded credentials and no overly broad policies. Lambda functions assume roles that grant only the specific actions they need (for example, bedrock-data-automation:InvokeDataAutomationAsync and s3:GetObject for the BDA Trigger Lambda).


Walkthrough

This walkthrough takes you from prerequisites through deployment and verification.

Prerequisites

Before deploying, confirm you have the following:

Required software:

  • Python 3.10 or later.
  • Poetry (Python dependency management).
  • AWS Command Line Interface (AWS CLI) configured with appropriate credentials.

Verify your Poetry installation:

poetry --version

If you need to install Poetry:

curl -sSL https://install.python-poetry.org | python3 -

Required AWS permissions:

You need IAM permissions for the following services:

  • Amazon Bedrock Data Automation.
  • AWS CloudFormation (create, update, and delete stacks).
  • Amazon S3 (create buckets, upload and download objects).
  • AWS Lambda (create and update functions).
  • AWS Identity and Access Management (IAM) (create roles and policies).
  • AWS HealthLake (create data stores).

Supported AWS Regions:

This solution currently supports us-east-1 (US East N. Virginia) and us-west-2 (US West Oregon) only. These are the Regions where Amazon Bedrock Data Automation is available.


Deploy the pipeline

Deployment takes approximately 15–20 minutes. Run the following four commands to go from zero to a fully deployed pipeline:

# 1. Clone the repository and install dependencies
git clone <repository-url>
cd Medical-Record-Digitization-and-FHIR-Integration-Pipeline
poetry install

# 2. Configure your environment
poetry run python src/utils/setup_env.py

# 3. Deploy the CloudFormation stack (approximately 15 minutes)
poetry run python src/automation/deploy.py

# 4. Verify deployment
aws cloudformation describe-stacks \
  --stack-name bda-medical-records-stack \
  --query 'Stacks[0].StackStatus'
# Expected output: "CREATE_COMPLETE"

The deployment creates the following resources:

  • Amazon Bedrock Data Automation blueprint and project (custom medical records schema with over 50 fields).
  • Amazon S3 input and output buckets with automatic event notifications.
  • Two AWS Lambda functions (BDA Trigger and FHIR Processor).
  • AWS HealthLake FHIR R4 data store.
  • AWS Identity and Access Management (IAM) roles and policies with least-privilege permissions.
  • Amazon CloudWatch log groups for all Lambda executions.

For manual environment configuration, advanced deployment options, and troubleshooting, see the GitHub repository.


See it in action

After it’s deployed, upload a sample medical record to trigger the full pipeline. You can use the sample provided in the GitHub repository.

# Get your input bucket name from the CloudFormation stack output
INPUT_BUCKET=$(aws cloudformation describe-stacks \
  --stack-name bda-medical-records-stack \
  --query 'Stacks[0].Outputs[?OutputKey==`InputBucketName`].OutputValue' \
  --output text)

# Upload a sample PDF (use the synthetic records included in the repository)
aws s3 cp samples/medical-record-sample.pdf s3://$INPUT_BUCKET/

# Track BDA processing jobs
poetry run python src/utils/track_bda_jobs.py

Within 2–3 minutes, Amazon Bedrock Data Automation processes the PDF and the FHIR Processor Lambda imports the results into HealthLake. View the extracted data:

poetry run python src/utils/view_results.py

Example output:

Found 8 result files in output bucket
Processing: medical-record-sample_results.json

Patient Information:
---------------------
Name: Wilkins, Samantha
Patient ID: A1B2C3D4
Date of Birth: 10/28/1953

Conditions (5 found):
- Hypothyroidism (ICD-10: E03.9) - Confidence: 0.98
- Vitamin D Deficiency (ICD-10: E55.9) - Confidence: 0.95
- Hypertension (ICD-10: I10) - Confidence: 0.97
- Osteoarthritis (ICD-10: M19.90) - Confidence: 0.92
- Gastroesophageal Reflux Disease (ICD-10: K21.9) - Confidence: 0.96

Medications (4 found):
- Levothyroxine 100 mcg daily
- Vitamin D3 2000 IU daily
- Lisinopril 10 mg daily
- Omeprazole 20 mg daily
Lab Results (16 tests):
TSH: 2.3 mIU/L (Normal range: 0.4-4.0) ✓
Vitamin D: 28 ng/mL (Normal range: 30-100) ⚠
Blood Pressure: 128/82 mmHg (Stage 1 Hypertension) ⚠

[✅] FHIR conversion complete
[✅] Imported to HealthLake datastore: ds-abc123xyz456

Query FHIR data from AWS HealthLake

After ingestion, query your data using the interactive FHIR query interface:

poetry run python src/utils/query_medical_data.py

Supported FHIR query patterns:

# Search by patient name
Patient?name=Wilkins

# Get conditions for a specific patient
Condition?patient=Patient/47ef817a-9826-4498-b693-2af5eb2b5250

# Get lab results only
Observation?category=laboratory

# Get vital signs only
Observation?category=vital-signs

# Get all medications
MedicationRequest

Python example, authenticated FHIR API call:

import boto3, requests, os
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest

session = boto3.Session()
credentials = session.get_credentials()
region = os.environ.get('AWS_REGION', 'us-west-2')
datastore_id = os.environ.get('DATASTORE_ID')

url = f'https://healthlake.{region}.amazonaws.com/datastore/{datastore_id}/r4/Patient?name=Wilkins'
request = AWSRequest(method='GET', url=url, headers={'Accept': 'application/fhir+json'})
SigV4Auth(credentials, 'healthlake', region).add_auth(request)

response = requests.get(url, headers=dict(request.headers))
print(response.json())

Security considerations

This is a demonstration sample for synthetic data only. Do not use with real Protected Health Information (PHI) without implementing the controls listed in the following sections.

Security controls included in this sample:

  • IAM roles with least-privilege permissions.
  • Amazon S3 bucket access controls (private by default).
  • AWS KMS encryption for AWS HealthLake data at rest.
  • AWS service-to-service authorization using IAM roles.
  • Amazon CloudWatch logging for audit trails.

Additional controls required for production PHI workloads:

AWS HealthLake is a HIPAA Eligible Service. Customers must review the AWS Shared Responsibility Model to understand their security and compliance obligations. Before processing real patient data, implement the following:

  1. AWS Business Associate Addendum (BAA): Required under HIPAA before processing PHI on AWS.
  2. Amazon Virtual Private Cloud (Amazon VPC) isolation: Lambda functions and AWS HealthLake in private subnets with AWS PrivateLink.
  3. Comprehensive logging: AWS CloudTrail, AWS Config, Amazon S3 access logs, and Amazon VPC flow logs.
  4. Encryption in transit: TLS 1.2 or later. Use Amazon VPC endpoints to avoid public internet exposure.
  5. Access controls: Multi-factor authentication (MFA), role-based access control (RBAC), temporary credentials, and regular access reviews.
  6. Compliance monitoring: AWS Security Hub with HIPAA compliance checks.
  7. Data lifecycle management: Retention policies, secure deletion, and data loss prevention (DLP) controls.

For full guidance, see the AWS HIPAA Compliance page.

Pricing

The following estimates apply to testing with approximately 100 medical records per month in the US West (Oregon) Region:

Service Usage Estimated monthly cost
Amazon Bedrock Data Automation 100 pages (approximately $0.20–$0.30/page) $20–$30
AWS HealthLake 5 GB storage + 100 queries $15–$20
AWS Lambda 200 invocations (512 MB, approximately 30s avg) $5–$10
Amazon S3 1 GB storage + 200 requests $1–$2
AWS KMS 1 customer managed key $1
Total approximately $50–$100/month

For production workloads processing 10,000 records per month, expect costs in the range of $2,000–$3,000/month. The primary cost drivers are BDA (charged per page), HealthLake (charged per search request), and VPC endpoints (hourly PrivateLink charges in production deployments).

Cost optimization tips:

  • Delete the CloudFormation stack when not actively testing: aws cloudformation delete-stack --stack-name bda-medical-records-stack.
  • Set up AWS Budgets alerts to catch unexpected costs early.
  • Monitor Lambda duration in CloudWatch to optimize function execution time.

Clean up

To avoid ongoing charges, delete the CloudFormation stack when you’re done:

aws cloudformation delete-stack --stack-name bda-medical-records-stack
aws cloudformation wait stack-delete-complete --stack-name bda-medical-records-stack

For cleanup of manually created Amazon Bedrock Data Automation projects and S3 bucket contents, see the GitHub repository.

What’s next

After you deploy, you can extend this foundation to:

  • Integrate with existing electronic health records (EHR) systems through FHIR APIs.
  • Build analytics dashboards using Amazon Quick Sight.
  • Add natural language search with Amazon Kendra.
  • Add Amazon Simple Queue Service (Amazon SQS) as a buffer between Amazon S3 events and the BDA Trigger Lambda to handle burst uploads and manage BDA concurrency limits at scale.
  • Orchestrate with AWS Step Functions for error handling, retry logic, and routing low-confidence extractions to human review.
  • Implement real-time, high-volume processing with Amazon Kinesis Data Streams for continuous ingestion from multiple sources.

Conclusion

In this post, you saw how Amazon Bedrock Data Automation, AWS Lambda, Amazon S3, and AWS HealthLake work together to automate the transformation of scanned medical records into FHIR R4-compliant data. The event-driven architecture removes manual data entry, scales without custom machine learning models, and makes historical records accessible to modern care delivery systems.

Key takeaways:

  • Amazon Bedrock Data Automation extracts over 50 structured clinical fields from PDFs without template configuration.
  • AWS Lambda orchestrates the pipeline with two focused, event-driven functions.
  • Amazon S3 event notifications decouple each stage, so each can scale independently.
  • AWS HealthLake validates, indexes, and exposes FHIR R4 data through standard APIs.
  • Security controls are the customer’s responsibility under the AWS Shared Responsibility Model.

To explore the full source code, advanced configuration options, and customization guidance, visit the GitHub repository.


Additional resources

For more information, see the following additional resources:

This solution is intended for educational purposes using synthetic data. Review the security considerations and consult your compliance team before deploying in any environment with real patient data.


About the authors

Minisforum S5 All-Flash NAS Shown Based on Intel’s Wildcat Lake Platform

Post Syndicated from Ryan Smith original https://www.servethehome.com/minisforum-s5-all-flash-nas-shown-based-on-intels-wildcat-lake-platform/

At Computex 2026 Minisforum was showing off their upcoming S5 NAS, a mid-range all-flash NAS. With 5 M.2 SSD slots and 10GbE networking, the fanless NAS punches up

The post Minisforum S5 All-Flash NAS Shown Based on Intel’s Wildcat Lake Platform appeared first on ServeTheHome.

Upgrade PySpark from Spark 3.5 to Spark 4.0 with AWS Spark Upgrade Agent

Post Syndicated from Prasad Nadig original https://aws.amazon.com/blogs/big-data/upgrade-pyspark-from-spark-3-5-to-spark-4-0-with-aws-spark-upgrade-agent/

Upgrading Apache Spark applications across major versions means tracking down breaking changes, manually debugging failures from log files, and running repeated test cycles. This process can stretch across weeks for complex code bases.

In this post, we walk through a hands-on PySpark migration from Spark 3.5 to Spark 4.0 on Amazon EMR Serverless, using the AWS Spark Upgrade Agent. You’ll see how the agent iteratively validates your application on a live Amazon EMR Serverless application, automatically diagnosing and resolving failures from Amazon CloudWatch logs until the job succeeds. By the end, you have a multi-pipeline PySpark application running on Spark 4.0 with four distinct breaking changes resolved. The fixes include configuration key removals, codec renames, and stricter charset validation, all driven through natural language interaction in the Integrated Development Environment (IDE).

This is part 2 of a three-part series on how the AWS Spark Upgrade Agent can automate and simplify Spark upgrades.

In Part 1, we introduced the agent’s architecture and capabilities. This post walks through a complete PySpark migration from Spark 3.5 to Spark 4.0 on Amazon EMR Serverless.

In the sections that follow, you will set up the prerequisites and infrastructure, explore the sample application, run the iterative validation workflow on EMR Serverless, review data quality results, and generate a comprehensive upgrade summary.

Note: Because this upgrade is performed using the AWS Spark Upgrade Agent Model Context Protocol (MCP) server, an agentic artificial intelligence (AI) system, the agent might take different paths to reach the same successful outcome. The workflow demonstrated here represents one successful upgrade path. The key takeaway is the end-to-end workflow: generating an upgrade plan, iteratively validating on Amazon EMR Serverless, and producing a comprehensive upgrade summary.

1. Prerequisites and setup

This section covers the tools, infrastructure, and IDE configuration you need before starting the upgrade. To follow along, you need an AWS account with an AWS Identity and Access Management (AWS IAM) user or role that has permissions to deploy AWS CloudFormation stacks, create AWS IAM roles and policies, and create Amazon EMR Serverless applications. Intermediate knowledge of AWS Command Line Interface (AWS CLI), AWS CloudFormation, and Python is helpful.

1.1 Install Kiro CLI and local tools

In this post, we use Kiro CLI to demonstrate the upgrade workflow. You can use an MCP-compatible IDE or framework. Examples include VS Code with Cline, Cursor, Windsurf, and Claude Desktop, among others. To follow along with Kiro CLI, install it on your workstation. For more details on the installation and setup, refer to Setup for Upgrade Agent:

curl -fsSL https://cli.kiro.dev/install | bash

Run the following command and use your builder ID to log in:

kiro-cli login --use-device-flow

With the Kiro CLI installed and logged in, rather than installing the remaining tools manually, use Kiro CLI to set up and verify your prerequisites with the following prompt:

kiro-cli chat
> Install AWS CLI, Python 3.10, and uv on my system if they are not already installed

Kiro CLI output showing successful installation of AWS CLI, Python, and uv

Output of AWS CLI and local tools install step.

These tools are needed for the upgrade workflow:

1.2 Infrastructure setup (AWS CloudFormation)

Two AWS CloudFormation stacks create the required resources: an AWS IAM role, an Amazon Simple Storage Service (Amazon S3) staging bucket, an Amazon EMR Serverless application (Spark 4.0.1), and its execution role.

Stack 1 – AWS IAM role and Amazon S3 staging bucket:

The spark-upgrade-mcp-setup template creates the AWS IAM role and Amazon S3 staging bucket required by the upgrade agent. Choose the Launch Stack button for your Region. For additional Regions, see the full region list.

# Region Launch
1 US East (N. Virginia) Launch Stack
2 US East (Ohio) Launch Stack
3 US West (Oregon) Launch Stack
4 Europe (Ireland) Launch Stack

After deployment, open the AWS CloudFormation Outputs tab, copy the ExportCommand value, and run it in your terminal. This sets SMUS_MCP_REGION, IAM_ROLE, and STAGING_BUCKET_PATH automatically.

CloudFormation Outputs tab showing ExportCommand with SMUS_MCP_REGION, IAM_ROLE, and STAGING_BUCKET_PATH values

Outputs tab of the CloudFormation stack.

# Sets SMUS_MCP_REGION, IAM_ROLE, and STAGING_BUCKET_PATH
export SMUS_MCP_REGION=<YOUR-REGION> && export IAM_ROLE=arn:aws:iam::<YOUR-ACCOUNT-ID>:role/spark-upgrade-role-* && export STAGING_BUCKET_PATH=<YOUR-BUCKET>

Then configure the AWS CLI profile:

aws configure set profile.spark-upgrade-profile.role_arn ${IAM_ROLE}
aws configure set profile.spark-upgrade-profile.source_profile default
aws configure set profile.spark-upgrade-profile.region ${SMUS_MCP_REGION}

Stack 2 – Amazon EMR Serverless target application and execution role:

git clone https://github.com/aws-samples/sample-amazon-emr-spark4-examples
cd sample-amazon-emr-spark4-examples/pyspark/AWSSpark4AutoUpgradeDemo

The PySpark sample lives at resources/global_logistics_platform/. The AWS CloudFormation template lives at resources/cloudformation/.

Deploy the AWS CloudFormation template to create the source and target Amazon EMR Serverless applications and a shared execution role:

aws cloudformation deploy \
  --template-file resources/cloudformation/emr-serverless-target-setup.yaml \
  --stack-name spark-emr-serverless-upgrade \
  --region ${SMUS_MCP_REGION} \
  --capabilities CAPABILITY_NAMED_IAM \
  --parameter-overrides \
    StagingBucketName=${STAGING_BUCKET_PATH} \
    SourceReleaseLabel=emr-7.0.0 \
    TargetReleaseLabel=emr-spark-8.0-preview \
    SourceApplicationName=spark-upgrade-source \
    TargetApplicationName=spark-upgrade-target

This creates two Amazon EMR Serverless applications: a source (Spark 3.5.0) for data quality baseline and a target (Spark 4.0.1) for upgrade validation, with a shared execution role. Both applications auto-stop after 15 minutes of idle time, so there is no cost when not in use. To upgrade between different Spark versions, override SourceReleaseLabel and TargetReleaseLabel with your target Amazon EMR release labels.

After the stack completes deployment, note the outputs:

aws cloudformation describe-stacks \
  --stack-name spark-emr-serverless-upgrade \
  --region ${SMUS_MCP_REGION} \
  --query "Stacks[0].Outputs" --output table

This gives you the SourceApplicationId, TargetApplicationId, and ExecutionRoleArn needed for the upgrade prompt. Make a note of them.

1.3 IDE and MCP server configuration

Configure the spark-upgrade MCP server. For Kiro CLI:

kiro-cli-chat mcp add \
    --name "spark-upgrade" \
    --command "uvx" \
    --args '[
      "mcp-proxy-for-aws@latest",
      "https://sagemaker-unified-studio-mcp.'${SMUS_MCP_REGION}'.api.aws/spark-upgrade/mcp",
      "--service", "sagemaker-unified-studio-mcp",
      "--profile", "spark-upgrade-profile",
      "--region", "'${SMUS_MCP_REGION}'",
      "--read-timeout", "180"
    ]' \
    --timeout 180000 \
    --scope global

For other MCP clients, refer to your IDE’s MCP configuration documentation and use the same server parameters shown previously.

Verify the connection: Start Kiro CLI and confirm the spark-upgrade tools are loaded:

$ kiro-cli chat
...
spark-upgrade (MCP):
- generate_spark_upgrade_plan          * not trusted
- update_build_configuration           * not trusted
- fix_upgrade_failure                  * not trusted
- run_validation_job                   * not trusted
- check_job_status                     * not trusted
...

Tip: After Kiro CLI and the MCP server are configured, you can ask the agent to verify your setup. For example: “Check if I have AWS CLI, Python 3.10+, and uv installed, and confirm the spark-upgrade MCP server is connected.”

Kiro CLI output confirming spark-upgrade MCP server connection and tool availability

Output showing the status of each tool, AWS CLI, and MCP server.

Tip: Trust mode vs. confirm mode: When running the upgrade agent in Kiro CLI, you have two options:

Trust mode: Type t when prompted to approve a tool. The agent auto-approves subsequent uses of that tool without asking for confirmation. You can also use /tools trust-all to trust every tool at once for a fully autonomous experience.

Confirm mode: Type y for each individual tool invocation. This lets you review, verify, and approve every action before the agent runs it. If this is your first time using the agent, use confirm mode for full visibility.

2. Hands-on PySpark upgrade from Spark 3.5 to Spark 4.0

This section walks through the complete migration of a representative PySpark application from Amazon EMR Serverless 7.0.0 (Spark 3.5.0) to EMR Serverless with the emr-spark-8.0-preview release label (Spark 4.0.1), using the global_logistics_platform sample.

2.1 Sample project: global logistics platform

The sample application is a multi-domain PySpark data processing application with three pipelines:

  • Fleet management: Processes vehicle telemetry data (GPS tracking, fuel consumption, driver behavior scoring) using window functions, lag/lead operations, and statistical aggregations. Writes Parquet with lz4raw compression.
  • International shipping: Handles cross-border shipment documents with multi-language address standardization using character encoding functions (encode/decode with charsets like Shift_JIS, GB2312, EUC-KR), and processes carrier manifests with ISO-8859-1 encoding.
  • Historical compliance: Processes regulatory audit records spanning centuries (including pre-1582 Julian calendar dates), requiring legacy datetime rebasing for Parquet writes.

Project structure:

global_logistics_platform/
├── main.py                          # Orchestrator - runs all 3 pipelines
├── src/
│   ├── utils/
│   │   └── spark_config.py          # Spark session config & logging
│   └── domain/                      # Application code that needs migration
│       ├── fleet_management/
│       │   └── telemetry_processor.py
│       ├── international_shipping/
│       │   └── shipment_processor.py
│       └── historical_compliance/
│           └── compliance_processor.py
└── data/                             # Sample dataset for the workflow
    └── sample/
        ├── fleet_telemetry.csv
        ├── international_shipments.csv
        └── compliance_records.csv

2.2 The four Spark 4.0 incompatibilities

Before diving into the upgrade, here are the four specific breaking changes present in this code base that the agent discovers and resolves entirely through runtime validation:

# Incompatibility File(s)
1 Legacy Parquet configuration key removed: spark.sql.legacy.parquet.datetimeRebaseModeInWrite removed in Spark 4.0. Must use spark.sql.parquet.datetimeRebaseModeInWrite. spark_config.py
2 Parquet compression codec rename: lz4raw codec renamed to lz4_raw in Spark 4.0. telemetry_processor.py
3 Stricter charset encoding validation: Spark 4.0 tightened encode() behavior. Encoding CJK (Chinese, Japanese, Korean) characters to ISO-8859-1 now throws MALFORMED_CHARACTER_CODING. In Spark 3.x this silently replaced unmappable chars with ?. Restored via spark.sql.legacy.codingErrorAction. spark_config.py
4 Character encoding restrictions: encode()/decode() in Spark 4.0 supports US-ASCII, ISO-8859-1, UTF-8, UTF-16BE, UTF-16LE, UTF-16, and UTF-32. Code uses Shift_JIS, GB2312, EUC-KR. shipment_processor.py

The agent resolves each of these through iterative runtime validation on EMR Serverless: submitting the job, diagnosing failures from Amazon CloudWatch logs, applying fixes, and resubmitting until the job succeeds.

Architecture diagram showing the iterative validation workflow between the IDE, MCP server, and Amazon EMR Serverless

2.3 Step 1: Invoke the upgrade agent

Open the project in Kiro CLI and enter the following prompt:

Upgrade my Spark application in the current directory from EMR serverless version 7.0.0 to EMR serverless version 8.0.0.
Use Amazon EMR Serverless target app-id <YOUR-TARGET-APP-ID> and execution role
<YOUR-EXECUTION-ROLE-ARN> for validation.
Use source Amazon EMR Serverless app-id <YOUR-SOURCE-APP-ID> for data quality baseline.
Store artifacts at s3://${STAGING_BUCKET_PATH}/spark4-upgrade/python/
Enable data quality validation

Tip: The SourceApplicationId, TargetApplicationId, and ExecutionRoleArn are in the Outputs of the spark-emr-serverless-upgrade AWS CloudFormation stack you deployed in Section 1.2.

The agent invokes generate_spark_upgrade_plan, scans the project structure, identifies the Spark version mapping (EMR 7.0.0 → Spark 3.5.0, EMR 8.0.0 → Spark 4.0.1), and produces a structured upgrade plan with an Analysis ID for traceability.

The agent presents the plan and asks for confirmation. Type y to approve the tool invocation, or t to trust that tool for the rest of the session.

You have an option to save the plan as a local JSON file for future reference or to resume the upgrade at a later point, so go ahead and ask Kiro to save it locally. Provide the AWS CLI profile that you have configured on your system. Use the following prompt to provide these inputs:

Yes I would like to save the plan to a local file and use spark-upgrade-profile

2.4 Step 2: Build and package

The agent validates the Python project compiles successfully, then packages it for Amazon EMR Serverless deployment:

  • Runs py_compile on each .py file to verify syntax.
  • Creates src.zip containing the src/ directory (preserving the import structure used by from src.utils import ...).
  • Uploads src.zip, main.py, and sample input data to the Amazon S3 staging path.
# What the agent does behind the scenes:
zip -r src.zip src/
aws s3 cp main.py s3://<YOUR-BUCKET>/spark4-upgrade/python/<ANALYSIS-ID>/source/main.py
aws s3 cp src.zip s3://<YOUR-BUCKET>/spark4-upgrade/python/<ANALYSIS-ID>/source/src.zip
aws s3 cp data/sample/ s3://<YOUR-BUCKET>/spark4-upgrade/python/<ANALYSIS-ID>/input/ --recursive

No external dependencies (no requirements.txt), so no virtual environment is needed. If your project has external dependencies in a requirements.txt, the agent will package them into a virtual environment archive and include it in the EMR Serverless submission parameters.

2.5 Step 3: Data quality baseline on source application

Before migrating the code, the agent establishes a data quality baseline by running the original (pre-upgrade) code on the source Amazon EMR Serverless application (Spark 3.5.0 / EMR 7.0.0). This captures the expected output that the upgraded application must match.

The agent submits the job to the source application with data quality check enabled:

{
  "executionRoleArn": "arn:aws:iam::<YOUR-ACCOUNT-ID>:role/<YOUR-EXECUTION-ROLE>",
  "jobDriver": {
    "sparkSubmit": {
      "entryPoint": "s3://<YOUR-BUCKET>/spark4-upgrade/python/<ANALYSIS-ID>/source/main.py",
      "entryPointArguments": [
        "s3://<YOUR-BUCKET>/spark4-upgrade/python/<ANALYSIS-ID>/input/",
        "s3://<YOUR-BUCKET>/spark4-upgrade/python/<ANALYSIS-ID>/output/source/"
      ],
      "sparkSubmitParameters": "--py-files s3://<YOUR-BUCKET>/spark4-upgrade/python/<ANALYSIS-ID>/source/src.zip"
    }
  },
  "configurationOverrides": {
    "monitoringConfiguration": {
      "cloudWatchLoggingConfiguration": {
        "enabled": true,
        "logGroupName": "/aws/emr-serverless"
      }
    }
  }
}

The agent monitors the source run via check_job_status until it completes successfully. This baseline output is stored for comparison after the target validation succeeds.

2.6 Step 4: Iterative runtime validation on target application

This is the core of the upgrade. The agent submits the unmodified application to the target Amazon EMR Serverless application (Spark 4.0.1), and every incompatibility is discovered, diagnosed, and fixed through runtime failures. The agent drives the entire fix cycle by submitting to EMR, reading errors from Amazon CloudWatch logs, applying fixes, rebuilding, and resubmitting.

The agent presents the proposed Amazon EMR Serverless job configuration for your review before each submission. Type y to approve.

2.6.1 Fix 1: Legacy Parquet configuration key removed (iteration 1)

The first submission fails immediately at SparkSession initialization:

org.apache.spark.sql.AnalysisException:
The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite' was removed
in the version 4.0.0. Use 'spark.sql.parquet.datetimeRebaseModeInWrite' instead.

The Historical Compliance pipeline configures spark.sql.legacy.parquet.datetimeRebaseModeInWrite for handling pre-1582 Julian calendar dates. Spark 4.0 removed the legacy. prefix from this configuration key.

The agent calls fix_upgrade_failure, which identifies the migration rule and recommends the fix:

File: src/utils/spark_config.py

# Before
.config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY")

# After
.config("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")

After applying the fix, the agent rebuilds src.zip, re-uploads to Amazon S3, and resubmits the job.

2.6.2 Fix 2: Parquet compression codec rename (iteration 2)

The resubmitted job fails with a new error, which confirms progress:

pyspark.errors.exceptions.captured.IllegalArgumentException:
[CODEC_NOT_AVAILABLE.WITH_AVAILABLE_CODECS_SUGGESTION]
The codec lz4raw is not available.
Available codecs are brotli, uncompressed, lzo, snappy, lz4_raw, none, zstd, lz4, gzip.
SQLSTATE: 56038

The Fleet Management pipeline’s telemetry_processor.py uses lz4raw as the Parquet compression codec. Spark 4.0 renamed this to lz4_raw (with an underscore).

The recommended fix:

File: src/domain/fleet_management/telemetry_processor.py

# Before
.option("compression", "lz4raw")

# After
.option("compression", "lz4_raw")

The agent applies the change, rebuilds, and resubmits.

2.6.3 Fix 3: Stricter charset encoding validation (iteration 3)

The next submission surfaces a different failure:

org.apache.spark.SparkRuntimeException:
[MALFORMED_CHARACTER_CODING]
Invalid value found when performing `encode` with ISO-8859-1
SQLSTATE: 22000

The International Shipping pipeline’s process_carrier_manifests() method uses encode(..., 'ISO-8859-1') on data containing CJK (Chinese, Japanese, Korean) characters. Although ISO-8859-1 is in Spark 4.0’s supported charset list, it is a single-byte encoding that cannot represent CJK characters. In Spark 3.x, the Java charset encoder silently replaced unmappable characters with ?. Spark 4.0 tightened this behavior to throw MALFORMED_CHARACTER_CODING for unmappable characters.

The agent identifies the migration rule and adds a legacy compatibility configuration:

File: src/utils/spark_config.py

# Added to SparkSession builder
.config("spark.sql.legacy.codingErrorAction", "true")

This restores the Spark 3.x behavior where unmappable characters are silently replaced instead of throwing errors.

With the configuration added, the agent rebuilds and resubmits.

2.6.4 Fix 4: Character encoding restrictions (iteration 4)

The fourth submission fails with yet another encoding error:

org.apache.spark.SparkIllegalArgumentException:
[INVALID_PARAMETER_VALUE.CHARSET]
The value of parameter(s) `charset` in `encode` is invalid:
expects one of the iso-8859-1, us-ascii, utf-16, utf-16be, utf-16le, utf-32, utf-8,
but got Shift_JIS. SQLSTATE: 22023

The International Shipping pipeline’s standardize_addresses_with_charset() method uses Shift_JIS, GB2312, and EUC-KR charsets in encode()/decode() calls. Spark 4.0 restricts these functions to seven standard charsets. These regional charsets are not in the supported list.

The agent replaces the unsupported charsets with UTF-8:

File: src/domain/international_shipping/shipment_processor.py

Before (Spark 3.5.0):

df = df.withColumn(
    "shipper_address_normalized",
    when(col("origin_country") == "JP",
         expr("decode(encode(shipper_address, 'Shift_JIS'), 'UTF-8')"))
    .when(col("origin_country") == "CN",
         expr("decode(encode(shipper_address, 'GB2312'), 'UTF-8')"))
    .when(col("origin_country") == "KR",
         expr("decode(encode(shipper_address, 'EUC-KR'), 'UTF-8')"))
    .otherwise(col("shipper_address"))
)

After (Spark 4.0.1):

df = df.withColumn(
    "shipper_address_normalized",
    when(col("origin_country") == "JP",
         expr("decode(encode(shipper_address, 'UTF-8'), 'UTF-8')"))
    .when(col("origin_country") == "CN",
         expr("decode(encode(shipper_address, 'UTF-8'), 'UTF-8')"))
    .when(col("origin_country") == "KR",
         expr("decode(encode(shipper_address, 'UTF-8'), 'UTF-8')"))
    .otherwise(col("shipper_address"))
)

The same transformation is applied to consignee_address_normalized.

The agent rebuilds and resubmits one final time.

2.6.5 Final submission: success

The fifth submission completes successfully:

{"success": true, "message": "EMR SERVERLESS job completed successfully",
"compute_run_id": "<JOB-RUN-ID>", "status": "SUCCESS",
"application_type": "EMR-Serverless"}

The three pipelines (Fleet Management, International Shipping, and Historical Compliance) complete on EMR Serverless with the emr-spark-8.0-preview release label (Spark 4.0.1).

2.7 Summary of the iterative runtime validation

The runtime validation loop is the core value of the upgrade agent. Here’s the complete iteration history:

Table showing the four validation iterations with error types and fixes applied

Each iteration follows the same cycle:

Diagram showing the submit, diagnose, fix, rebuild, and resubmit cycle

Failures that would normally require manual log analysis, root cause investigation, and code patching are resolved automatically by the agent in this workflow.

3. Data quality validation

With both the source baseline (Section 2.5) and the upgraded target run (Section 2.6) completed successfully, the agent performs data quality validation to verify the migration hasn’t changed your application’s output. This is the key advantage of including the source application in your upgrade prompt: the agent can compare outputs from both Spark versions side by side.

3.1 Data quality comparison

The agent invokes get_data_quality_summary to compare the outputs across four dimensions:

  • Schema validation: Confirms column names, data types, and column ordering match between source and target outputs.
  • Row count validation: Verifies no data loss or duplication during migration.
  • Nullability validation: Detects changes in null handling.
  • Statistical summary validation: Compares numeric and string column distributions (min, max, mean, count, distinct values).

The agent presents the comparison results:

Data quality summary showing schema, row count, and nullability checks passing with a statistical mismatch in shipper_address

The preceding image shows the data quality summary.

Three of four checks pass cleanly. The statistical summary validation detects a mismatch in the shipper_address column of the customs_declarations output: the max and min summary values differ between source and target.

3.2 Understanding and resolving the mismatch

This mismatch is a direct consequence of Fix 4 (Section 2.6.4). The original code ran addresses through a Shift_JIS/GB2312/EUC-KRUTF-8 roundtrip that produced garbled text, because the intermediate regional charset corrupted multi-byte UTF-8 characters. The upgraded code uses UTF-8UTF-8, preserving addresses faithfully. The mismatch reflects improved data quality, not a regression.

Schema, row counts, and nullability matched exactly: the difference is limited to string values that were previously garbled. No further action is needed. The upgraded application is production-ready.

Expected behavior: Character encoding migrations might change string values, although they preserve semantic meaning. When data quality validation reports mismatches, trace each one back to a specific code change. If the mismatch is explained by a required migration fix (as here), verify the new behavior is correct and document it. If a mismatch cannot be explained, investigate before promoting to production.

4. Upgrade summary

After the agent completes the entire upgrade workflow, it produces a comprehensive upgrade summary following a structured template. This summary lets you review the job configuration updates, code modifications with diffs and file references, relevant migration rules applied, and data quality validation status.

Here is the summary the agent produced for this upgrade:

Upgrade plan

  • Compile and build project with current Spark 3.5.0: validated that Python files compile successfully.
  • Run baseline validation on source EMR Serverless (00g4vhvt1lhtrs09) with Spark 3.5.0: established data quality baseline.
  • Run target validation on target EMR Serverless (00g4vhvt3np1bj09) with Spark 4.0.1: fixed 4 issues iteratively across 4 validation attempts.
  • Compare data quality between source and target runs: detected expected mismatch in shipper_address.
  • Generate and persist upgrade summary.

Upgrade result

Upgrade completed with data validation enabled. Data validation detected an expected mismatch in the shipper_address column because of the charset encoding migration from unsupported charsets (Shift_JIS, GB2312, EUC-KR) to UTF-8.

Dependency changes

No external dependencies were changed in this project (no requirements.txt).

Job configuration changes

  • Parquet datetime rebase configuration key renamed.
    • Change: spark.sql.legacy.parquet.datetimeRebaseModeInWritespark.sql.parquet.datetimeRebaseModeInWrite.
    • Migration rule: In Spark 4.0, the legacy datetime rebasing SQL configurations with the prefix spark.sql.legacy are removed. The SQL configuration spark.sql.legacy.parquet.datetimeRebaseModeInWrite was removed in the version 4.0.0. Use spark.sql.parquet.datetimeRebaseModeInWrite instead.
  • Legacy coding error action enabled.
    • Change: Added spark.sql.legacy.codingErrorAction set to true.
    • Migration rule: In Spark 4.0, the encode() and decode() functions raise MALFORMED_CHARACTER_CODING error when handling unmappable characters. In Spark 3.5 and earlier versions, these characters are replaced with garbled text. To restore the previous behavior, set spark.sql.legacy.codingErrorAction to true.

Code changes

  • Validation attempt 1: Legacy Parquet configuration key.
    • Validation run: EMR-Serverless job_run_id 00g4vm14v118vg0b.
    • Error: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite' was removed in the version 4.0.0.
    • Applied changes: src/utils/spark_config.py: Changed .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY") to .config("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY").
  • Validation attempt 2: Parquet compression codec.
    • Validation run: EMR-Serverless job_run_id 00g4vm5pm1hig00b.
    • Error: [CODEC_NOT_AVAILABLE.WITH_AVAILABLE_CODECS_SUGGESTION] The codec lz4raw is not available.
    • Applied changes: src/domain/fleet_management/telemetry_processor.py: Changed .option("compression", "lz4raw") to .option("compression", "lz4_raw").
  • Validation attempt 3: Stricter charset encoding.
    • Validation run: EMR-Serverless job_run_id 00g4vm8sh4sp0g0b.
    • Error: [MALFORMED_CHARACTER_CODING] Invalid value found when performing encode with ISO-8859-1.
    • Applied changes: src/utils/spark_config.py: Added .config("spark.sql.legacy.codingErrorAction", "true") to the SparkSession builder.
  • Validation attempt 4: Unsupported charsets.
    • Validation run: EMR-Serverless job_run_id 00g4vmc668ng6o0b.
    • Error: [INVALID_PARAMETER_VALUE.CHARSET] charset in encode is invalid: expects one of iso-8859-1, us-ascii, utf-16, utf-16be, utf-16le, utf-32, utf-8, but got Shift_JIS.
    • Applied changes: src/domain/international_shipping/shipment_processor.py: Replaced Shift_JIS, GB2312, EUC-KR with UTF-8 for shipper and consignee address encoding.

Data validation result

# Validation Status
1 Schema validation (column names, types, ordering) Passed (no difference)
2 Row count validation (no data loss) Passed (no difference)
3 Nullability validation (null handling changes) Passed (no difference)
4 Statistical summary validation (numeric/string distributions) Failed (with difference)

Data mismatch: 1. The shipper_address column max summary value changed in customs_declarations output. This is expected because of the charset encoding migration from Shift_JIS/GB2312/EUC-KR to UTF-8. 2. The shipper_address column min summary value changed in customs_declarations output for the same expected cause.

5. Conclusion

The AWS Spark Upgrade Agent turns a traditionally time-consuming PySpark migration into an automated, iterative workflow. For the Global Logistics Platform sample, the agent identified and resolved four distinct Spark 4.0 breaking changes: legacy Parquet configuration key removal, compression codec renames, stricter charset encoding validation, and character encoding restrictions. Each fix was applied across three domain processors, through natural language interaction in the IDE.

Every incompatibility was discovered through runtime validation on Amazon EMR Serverless. The agent submitted the unmodified application to the target application, and each failure revealed the next breaking change:

  • The spark.sql.legacy.parquet.datetimeRebaseModeInWrite configuration removal, which crashes SparkSession initialization.
  • The lz4rawlz4_raw codec rename, which fails when Parquet writes run.
  • ISO-8859-1 encoding of CJK characters: ISO-8859-1 is a valid Spark 4.0 charset, so the failure surfaces only when the code runs against real multi-language data, because Spark 4.0 tightened charset encoding validation to reject unmappable characters.
  • Shift_JIS/GB2312/EUC-KR charsets removed from Spark 4.0’s supported charset list entirely.

The agent diagnosed each error from Amazon CloudWatch logs, applied the fix, rebuilt, and resubmitted without manual intervention beyond approving each step. The data quality validation then confirmed that the upgraded application produces equivalent output on Spark 4.0.1: schema, row counts, and nullability matched exactly. The one difference, in the shipper_address column, resulted from the charset migration from regional encodings to UTF-8, which actually improved data quality by eliminating garbled text from incorrect encoding roundtrips. With each mismatch traced back to a specific, understood code change, the upgraded application is production-ready.

# Category Spark 3.x behavior Spark 4.0 change Agent fix
1 Parquet datetime configuration spark.sql.legacy.parquet.datetimeRebaseModeInWrite legacy. prefix removed from key name Update configuration key
2 Parquet compression lz4raw codec name Renamed to lz4_raw (with underscore) Update codec name
3 Charset + CJK data ISO-8859-1 silently replaced unmappable CJK chars with ? Stricter charset validation throws MALFORMED_CHARACTER_CODING for unmappable characters Add spark.sql.legacy.codingErrorAction=true
4 Character encoding encode()/decode() supported Java charsets Restricted to 7 standard charsets Replace unsupported charsets with UTF-8

Next steps after your first upgrade:

  1. Apply the agent to your production PySpark code base.
  2. Integrate the upgrade workflow into your CI/CD pipeline.
  3. Explore Scala application upgrades (see Part 3 of this series).

To get started with your own PySpark migration:

  • Deploy the AWS CloudFormation templates from Section 1.2 for one-time AWS IAM, Amazon S3, and Amazon EMR Serverless setup.
  • Configure the spark-upgrade MCP server in your MCP-compatible IDE.
  • Point the agent at your PySpark project and let it handle the rest.

For more information, see the Amazon EMR Serverless documentation, the Apache Spark 4.0 migration guide, and the AWS Spark Upgrade Agent setup guide.

6. Clean up resources

To avoid ongoing costs, delete the resources you created:

  1. Delete the Amazon EMR Serverless stack:
    aws cloudformation delete-stack --stack-name spark-emr-serverless-upgrade --region ${SMUS_MCP_REGION}

  2. Delete the AWS IAM and Amazon S3 staging stack:
    aws cloudformation delete-stack --stack-name spark-upgrade-mcp-setup --region ${SMUS_MCP_REGION}

  3. If the Amazon S3 staging bucket contains objects, empty it before deleting the stack:
    aws s3 rm s3://${STAGING_BUCKET_PATH} --recursive


About the authors

Prasad Nadig

Prasad Nadig

Prasad Nadig is a Senior Analytics Specialist Solutions Architect at AWS, specializing in data and AI, including data lakes, data warehousing, and analytics services such as Amazon Redshift, Amazon EMR, and AWS Glue. He helps customers architect, migrate, and modernize their data and analytics workloads to achieve scalable, performant, and cost-effective solutions on AWS.

Karthik Prabhakar

Karthik Prabhakar

Karthik is a Data Processing Engines Architect for Amazon EMR at Amazon Web Services (AWS). He specializes in distributed systems architecture and query optimization, working with customers to solve complex performance challenges in large-scale data processing workloads. His focus spans engine internals, cost-optimization strategies, and architectural patterns that enable customers to run petabyte-scale analytics efficiently.

Bezuayehu Wate

Bezuayehu Wate

Bezuayehu is a Specialist Solutions Architect at AWS, specializing in big data analytics and AI solutions. She works closely with customers to modernize analytics platforms using AWS data and AI services. With a passion for emerging technologies and customer success, she thrives on designing innovative cloud solutions that deliver measurable business impact and drive organizational transformation.

Chuhan Liu

Chuhan Liu

Chuhan is a Software Development Engineer at AWS.

Keerthi Chadalavada

Keerthi Chadalavada

Keerthi is a Senior Software Development Engineer in the AWS analytics organization. She focuses on combining generative AI and data integration technologies to design and build comprehensive solutions for customer data and analytics needs.

Pradeep Patel

Pradeep Patel

Pradeep is a Sr. Software Engineer at AWS Glue. He is passionate about helping customers solve their problems by using the power of the AWS Cloud to deliver highly scalable and robust solutions. In his spare time, he loves to hike and play with web applications.

Announcing Spark Connect on Amazon EMR Serverless: Interactive PySpark development, anywhere

Post Syndicated from Al MS original https://aws.amazon.com/blogs/big-data/announcing-spark-connect-on-amazon-emr-serverless-interactive-pyspark-development-anywhere/

Today, AWS is announcing support for Spark Connect on Amazon EMR Serverless with EMR release 7.13 (Apache Spark 3.5.6) and later versions. You can now build and debug Spark applications from your preferred local environment while running full-scale Spark operations on EMR Serverless.

Previously, code that worked on a local machine might break in production because of environment mismatches, dependency conflicts, or unexpected data patterns. The only way to catch it was a deploy-and-check cycle. With the Spark Connect feature, you can develop Spark code from a supported local environment, such as an IDE (for example, VS Code or PyCharm), Jupyter notebooks, Amazon SageMaker Unified Studio (SMUS) Data Notebooks, Amazon Q Developer, or Kiro. There are no clusters to provision, no code to repackage, and no deploy-and-check loop. Your local Python session can stay local as usual while Spark operations are automatically routed to a remote Spark server for execution.

Each Spark Connect session has its own AWS resource with a unique ARN, enabling per‑session AWS Identity and Access Management (AWS IAM) permissions, tag‑based cost allocation, audit through AWS CloudTrail, and session-specific configuration overrides. This gives teams finer control over who runs what, at what cost. You also get real-time visibility through the Spark UI, persistent session history, and a dedicated interface to monitor and manage active and completed sessions.

For more details, visit the EMR Serverless release notes or the EMR Serverless Developer Guide. For a quick look at the experience, here’s a demonstration of using Spark Connect in Amazon SageMaker Unified Studio Data Notebooks:

For a runnable end-to-end example, try the EMR Serverless Spark Connect sample notebook from your local IDE. See the following demonstration:

How Spark Connect works

Spark Connect uses a client-server architecture that separates application code from the Spark engine. The client, a lightweight PySpark library running on a local environment, sends Spark operations over a secure gRPC/TLS connection to a Spark Connect server running on EMR Serverless. Then the server runs that Spark code on EMR Serverless as compute. Finally, it returns results to your local session.

Spark Connect client-server architecture showing a local IDE connecting to a Spark Connect server on EMR Serverless

Your local machine doesn’t need Spark installed, doesn’t need direct access to the data, and doesn’t need to be sized for the workload. Because the client is a compact library, you can embed Spark operations in your Python applications that support PySpark. This includes web services, dashboards, and automation scripts. For example, a development team can add Spark-powered analytics directly into a FastAPI backend or a Streamlit dashboard, treating Spark like a database driver rather than a separate batch system. These capabilities extend Spark Connect use cases beyond traditional notebook and IDE development, since the compute-intensive processing happens on the server – EMR Serverless side. This allows you to use pandas, matplotlib, and your team’s internal Python libraries on your laptop or in your embedded clients, without installing those libraries on EMR Serverless.

With Spark Connect server sessions running on EMR Serverless, you pay for compute only while your session is active. When inactive, you’re not paying. EMR Serverless automatically scales compute up and down based on workload demands through dynamic resource allocation (DRA), eliminating the need to predict capacity ahead of time. For teams that run Spark Connect sessions regularly, you can configure pre-initialized capacity on your EMR Serverless application for faster session startup times. Additionally, your Spark Connect sessions have access to the full suite of EMR Serverless features, including AWS Graviton processors for cost optimization and secure VPC connectivity to your data sources. You also get access to custom images with flexibility and integrated observability through Amazon CloudWatch and the Spark UI.

Getting started

Getting started with Spark Connect on EMR Serverless takes three steps: create an application, start a session, and connect from your IDE.

Note: The resources created in this quick start incur charges while active. Make sure to follow the cleanup steps at the end of this tutorial to avoid ongoing charges.

Prerequisites

  • In addition to the required job runtime IAM role, these additional permissions are needed: emr-serverless:StartSession, GetSession, GetSessionEndpoint, TerminateSession, GetResourceDashboard, and iam:PassRole on the runtime role.
  • An existing EMR Serverless application running emr-7.13.0 or later, with interactiveConfiguration.sessionEnabled = true.
  • boto3 version 1.43.0 or later to access the latest EMR Serverless session APIs.

Step 1: Create an EMR Serverless application with Spark Connect enabled

Amazon EMR Serverless application creation page in the EMR console

  • Open the Amazon EMR console and navigate to EMR Serverless.
  • Choose Get started. A pop-up appears. Choose Create and launch EMR Studio.
  • This takes you to the Create application page.
  • Enter a Name for your application (for example, spark-connect-app).
  • For Type, select Spark.
  • For Release version, select emr-7.13.0 or later.
  • For Architecture, choose x86_64 (default). This is compatible with most third-party tools and libraries.
  • Under Application setup options, select Use default settings for interactive workloads. This automatically sets interactiveConfiguration.sessionEnabled = true.
  • Choose Create and start application.

Alternatively, using the CLI command:

# Create an application with Spark Connect enabled
APP_ID=$(aws emr-serverless create-application \
  --type "SPARK" \
  --name "spark-connect-app" \
  --release-label emr-7.13.0 \
  --interactive-configuration '{"sessionEnabled": true}' \
  --query 'applicationId' \
  --output text)
echo "Created application: $APP_ID"
# Start the application
aws emr-serverless start-application --application-id "$APP_ID"

Step 2: Start a session

Next, start a session and obtain the Spark Connect endpoint.

Provide an IAM execution role that grants the session access to your data, such as reading data from an Amazon S3 bucket or querying the AWS Glue Data Catalog. This is the same type of role used for EMR Serverless batch jobs.

# Start a session with your execution role
$ROLE_ARN="YOUR_ROLE" # example: arn:aws:iam::123456789012:role/EMRServerlessSessionRole
SESSION_ID=$(aws emr-serverless start-session \
  --application-id $APP_ID \
  --execution-role-arn $ROLE_ARN \
  --query sessionId \
  --output text)

# Get the session endpoint
aws emr-serverless get-session-endpoint \
  --application-id $APP_ID \
  --session-id $SESSION_ID

The get-session-endpoint response includes a secure endpoint URL and an authentication token. All communication between your local environment and EMR Serverless is encrypted using TLS. Treat the token as a sensitive credential. Consider using AWS Secrets Manager to store and retrieve tokens programmatically. The authentication token is time-limited to 1 hour, so for long-running sessions we recommend that you refresh it periodically.

Step 3: Connect from your local IDE

Use the returned endpoint URL and authentication token to connect to the Spark Connect server.

The connection URL uses the sc:// protocol, which is the Spark Connect standard. The use_ssl=true parameter supports encrypted communications over TLS, so your data and credentials are protected in transit.

from pyspark.sql import SparkSession

# Use the endpoint and auth token from get-session-endpoint
session_endpoint="<endpoint-from-get-session-endpoint>"
auth_token="<authToken-from-get-session-endpoint>"

spark_connect_url = (
    f"sc://{session_endpoint}:443/;use_ssl=true;x-aws-proxy-auth={auth_token}"
)

spark = SparkSession.builder \
    .remote(spark_connect_url) \
    .getOrCreate()

# Query data in your S3 data lake
df = spark.sql("SELECT * FROM my_catalog.my_database.my_table")
df.show()

# Run transformations at scale
df.groupBy("category").count().orderBy("count", ascending=False).show()
spark.stop()

Once connected, Spark operations you write in your IDE can be run on EMR Serverless. For debugging, you can pause the execution at breakpoints, inspect variables, and step through your transformations locally while EMR Serverless processes your data on remote, scalable infrastructure.

Sessions remain active for a configurable idle timeout (1 hour by default). If your connection drops, the session continues running, allowing you to reconnect without losing your work. You can also access the live Spark UI through the GetResourceDashboard API to monitor queries, stages, and executors in real time. After the session ends, the Spark History Server remains available for post-run analysis.

Clean up resources

If the 1-hour session idle timeout does not meet your needs, you can manually remove sessions to avoid ongoing costs. Note that terminating an active session will immediately stop you running Spark operations. Before doing that, verify all your critical data processing is completed, and results are saved.

# 1. Stop the active session
aws emr-serverless terminate-session \
  --application-id $APP_ID \
  --session-id $SESSION_ID

# 2. Stop the application
aws emr-serverless stop-application --application-id $APP_ID

Use cases

Spark Connect on EMR Serverless supports a wide range of development workflows. The following are some of the most popular use cases, including but not limited to:

  • Interactive ETL development — Build and test data pipelines interactively, validating transformations against full-scale datasets before promoting them to production as batch jobs.
  • SageMaker Unified Studio (SMUS) Data Notebooks — Run interactive PySpark sessions directly from SMUS Data Notebooks connected to EMR Serverless through Spark Connect.
  • Direct S3 and JDBC access without a catalog — Connect directly to S3 files and JDBC data sources without needing a metastore or catalog configuration.
  • Apache Iceberg Data Lakehouse analytics — Query and manage Iceberg tables through the AWS Glue Data Catalog, with full support for time travel, schema evolution, and partition management.
  • Amazon S3 Tables with federated catalog — Access S3 Tables as a federated Glue Data Catalog source, combining Iceberg features with serverless Spark execution.
  • dbt-spark — Run dbt-spark adapter against EMR Serverless via Spark Connect, allowing analytics engineers to develop and test transformations locally with dbt framework while using EMR Serverless as the remote Spark engine.
  • Exploratory data analysis and feature engineering — Analyze production-scale data from your preferred notebook environment instead of using sampled subsets, helping you catch data quality issues earlier.
  • Compute standardization — Standardize EMR Serverless as the Spark backend while giving you the flexibility to use preferred local tools, version control, and CI/CD workflows.

These use cases work across multiple client surfaces: IDEs, Jupyter notebooks, dbt-spark, and AI coding agents. Because Spark Connect is an open Apache Spark standard, the same PySpark code typically works across different Spark backends by changing the connection endpoint.

Availability and pricing

Spark Connect on EMR Serverless is now available with Apache Spark 3.5.6 on Amazon EMR release 7.13 and higher in all AWS Regions where EMR Serverless is available. There is no additional charge for using Spark Connect. You pay for the EMR Serverless compute resources (vCPU, memory, and storage) consumed during your session, the same pricing model as EMR Serverless batch jobs.

Conclusion

Spark Connect on EMR Serverless bridges the gap between local development and production-scale execution. Build and debug PySpark applications from your preferred environment (IDE, notebook, dbt, or AI coding agent) while EMR Serverless handles automatic scaling, per-session cost visibility, and infrastructure management behind the scenes. With ARN-addressable sessions, fine-grained IAM permissions, tag-based cost allocation, and per-session configuration overrides, your team gets the controls they need without sacrificing flexibility.

Get started today with EMR release 7.13.0 (Spark 3.5.6). Follow the step-by-step tutorial in the EMR Serverless Developer Guide to create your first Spark Connect session and experience interactive, serverless PySpark development firsthand.


About the authors

Al MS

Al MS

Al is a product manager for Amazon EMR at AWS.

Melody Yang

Melody Yang

Melody Yang is a Principal Analytics Specialist Solution Architect at AWS with expertise in Big Data technologies. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

KiKi Nwangwu

KiKi Nwangwu

Kiki is an Analytics and GenAI Specialist Solutions Architect at AWS. She specializes in helping customers architect, build, and modernize scalable data analytics and generative AI solutions. She enjoys traveling and exploring new cultures.

Build stateful streaming applications with Apache Spark 4.0 on Amazon EMR Serverless

Post Syndicated from Raj Ramasubbu original https://aws.amazon.com/blogs/big-data/build-stateful-streaming-applications-with-apache-spark-4-0-on-amazon-emr-serverless/

Apache Spark 4.0 represents a major milestone in stream processing, introducing new capabilities that fundamentally change how developers build stateful streaming applications. At the heart of these improvements is the transformWithState API – a new capability that enables first-class support for timers, automatic state management, and schema evolution to Spark Structured Streaming.

With Spark 4.0 now available on Amazon EMR Serverless, developers can build stateful streaming applications using the transformWithState API in a fully managed, serverless environment that automatically scales based on workload demands. This combination delivers the power of sophisticated stream processing without the operational overhead of cluster management.

In this post, we demonstrate how to build a production-ready IoT device monitoring system using Spark 4.0’s transformWithState API on Amazon EMR Serverless. This example showcases the key capabilities of stateful streaming and provides a template you can adapt for your own use cases.

Apache Spark 4.0: introducing transformWithState

Apache Spark 4.0’s latest streaming features solve common production challenges in stateful applications by introducing native timer support and advance state management capabilities for complex event processing workflows. The new transformWithState API provides:

Key features of transformWithState

  • Native timer support: Register timers that fire callbacks at specific times for use cases like heartbeat monitoring, session timeout detection, and SLA violation alerts.
  • Automatic state TTL (Time-To-Live): Configure automatic expiration policies to prevent state from growing indefinitely. This is useful for use cases like session state size control, clearing stale device telemetry, maintaining a recency cache, or tracking invalid logins within the last hour for fraud detection.
  • Schema evolution: Evolve state schema without restarting from a new checkpoint. Add optional fields, remove fields, or widen numeric types. This is particularly valuable for use cases where data structures are dynamic, and application downtime for schema migration is not acceptable, enabling more resilient and flexible real-time streaming applications.
  • Multiple state variables: Support for multiple independent state variables (ValueState, ListState, MapState) per key, well-suited for building complex, real-time applications that require sophisticated state management, such as storing a history of recent error codes, tracking counts of various alert types, or maintaining multiple dimensions of user activity within a single stateful operator.
  • State observability: Query application state mid-stream using the State Data Source Reader for debugging and monitoring. This is especially valuable in applications that require maintaining and evolving state through several steps, such as detection of sophisticated event patterns across multiple streams and over time, where visibility into state transitions is critical for troubleshooting and validation.
  • Operator chaining: Chain multiple stateful operators together for complex multi-stage processing pipelines.

These capabilities make Spark 4.0 ideal for applications that were previously difficult or impossible to implement efficiently, such as complex event processing, session analytics, anomaly detection, and real-time monitoring systems.

Use case: IoT heartbeat monitoring

Consider a fleet of 100,000 IoT sensors deployed across manufacturing facilities. Each sensor sends a heartbeat signal every 20 seconds to indicate it’s operational. Your operations team needs to be alerted within 30 seconds if any sensor goes offline, with repeat alerts every 60 seconds until the sensor comes back online.

This seemingly simple requirement presents several technical challenges. The application must maintain the last heartbeat timestamp for each of the 100,000 devices while independently managing timers to detect missed signals per device. It also needs to handle out-of-order heartbeats caused by network delays and clean up state for decommissioned devices to prevent unbounded memory growth. All of this must happen at scale, processing millions of events per minute with low latency, while recovering gracefully from failures without losing state.

To address the specific challenges of IoT heartbeat monitoring described above, we present a solution built on the transformWithState API in Spark 4.0. With its native timer support, automatic state management, and built-in fault tolerance, making it the ideal solution for IoT heartbeat monitoring at scale.

Solution overview

Our solution architecture follows a serverless, event-driven design:

Solution architecture showing IoT devices sending heartbeats to Kinesis Data Streams, processed by EMR Serverless with transformWithState, checkpointed to Amazon S3, and alerts delivered via Amazon SNS

  1. IoT devices send heartbeat events to Amazon Kinesis Data Streams containing device ID, timestamp, and metadata (battery level, signal strength, firmware version).
  2. Amazon EMR Serverless reads from Kinesis using the Spark aws-kinesis connector using VPC Endpoint for Kinesis, then parses JSON events into structured DataFrames and grouping by device_id.
  3. transformWithState processes each device’s stream. On heartbeat arrival, it updates state and registers a 30-second timer; when the timer expires without a new heartbeat, it emits an offline alert.
  4. State is automatically persisted to RocksDB locally and checkpointed to Amazon Simple Storage Service (Amazon S3), enabling fault-tolerant recovery and exactly-once processing semantics.
  5. Alerts are delivered via Amazon Simple Notification Service (Amazon SNS) to configured subscribers (email, SMS, AWS Lambda, webhooks).

Prerequisites

Before implementing this solution, verify that you have:

  1. AWS account: With permissions for EMR Serverless, Kinesis, SNS, S3, VPC, and IAM.
  2. AWS Command Line Interface (AWS CLI): Configured with appropriate credentials.
  3. VPC setup: VPC with private subnets and security groups configured.
  4. Kinesis VPC interface endpoint: VPC endpoint for private connectivity to Kinesis.
  5. Kinesis Data Stream: Created for ingesting heartbeat events (for example, iot-heartbeats). For testing your streaming data solution, refer to Test your streaming data solution with the new Amazon Kinesis Data Generator.
  6. SNS topic: Created for sending alerts (for example, iot-alerts).
  7. S3 bucket: For storing application code, dependencies, and checkpoints.

Step-by-step implementation

The following steps walk you through setting up an EMR Serverless application with Spark 4.0, configuring the stateful streaming processor, and deploying the IoT heartbeat monitoring solution.

Step 1: Create the EMR serverless application

Run the following command in your terminal using the AWS CLI. Replace the subnet and security group IDs with the values from your VPC setup.

# Create EMR Serverless application with Spark 4.0 and VPC
aws emr-serverless create-application \
  --name "iot-heartbeat-monitor" \
  --release-label "emr-spark-8.0.0" \
  --type "SPARK" \
  --network-configuration '{
    "subnetIds": ["subnet-xxxxx", "subnet-yyyyy"],
    "securityGroupIds": ["sg-zzzzz"]
  }' \
  --region us-east-1

The command returns a JSON response containing the application details. Note the applicationId value from the output, as you will need it in subsequent steps.

Step 2: Implement the heartbeat monitor

The core of our solution is the HeartbeatMonitor class that extends StatefulProcessor. This class demonstrates the key features of Spark 4.0’s transformWithState API. Download the full implementation script and upload it to your local S3 bucket for execution. Let’s walk through each component to understand how it works.

2.1 Initialize state variables

The init() method is called once when the processor is initialized. This is where we define and register our state variables.

from pyspark.sql.streaming.stateful_processor import (
    StatefulProcessor, StatefulProcessorHandle
)

class HeartbeatMonitor(StatefulProcessor):

    def init(self, handle: StatefulProcessorHandle) -> None:
        self.handle = handle

        # Define state schemas
        last_seen_schema = StructType([
            StructField("timestamp", TimestampType(), True)
        ])

        device_info_schema = StructType([
            StructField("battery_level", StringType(), True),
            StructField("firmware_version", StringType(), True)
        ])

        # Initialize multiple independent state variables
        self.last_seen = handle.getValueState("last_seen", last_seen_schema)
        self.device_info = handle.getValueState(
            "device_info", device_info_schema
        )

In the init() method, we use StatefulProcessorHandle to define and initialize two per-key state variables, last_seen and device_info, using Spark’s StructType schemas and the getValueState() API. These state variables are automatically stored in RocksDB and checkpointed to S3, allowing for fault-tolerant state management across streaming micro-batches.

2.2 Handle incoming heartbeat events and register timers

The handleInputRows() method is called whenever new events arrive for a device. This is where we update state and register timers.

def handleInputRows(
    self, key: tuple, rows: Iterator[pd.DataFrame], timerValues
) -> Iterator[pd.DataFrame]:
    device_id = key[0]

    # Process incoming heartbeats - iterate through all rows to find latest
    latest_timestamp = None
    for pdf in rows:
        for _, row in pdf.iterrows():
            ts = row['timestamp']
            if pd.isna(ts):
                continue
            if latest_timestamp is None or ts > latest_timestamp:
                latest_timestamp = ts

    if latest_timestamp is None:
        yield pd.DataFrame()
        return

    # Check if we have existing state
    existing_timestamp = None
    if self.last_seen.exists():
        existing_state = self.last_seen.get()
        existing_timestamp = existing_state[0]

    # Update state only if new heartbeat is more recent
    if existing_timestamp is None or latest_timestamp > existing_timestamp:
        # Cancel existing timers (device is back online)
        for timer in self.handle.listTimers():
            self.handle.deleteTimer(timer)

        # Update state with new timestamp
        self.last_seen.update((latest_timestamp,))

        # Register timer for heartbeat deadline detection
        current_time_ms = timerValues.getCurrentProcessingTimeInMs()
        deadline_ms = current_time_ms + HEARTBEAT_INTERVAL_MS
        # 30 seconds from now
        self.handle.registerTimer(deadline_ms)

    yield pd.DataFrame()  # No output from input handling

The handleInputRows() method processes incoming heartbeat events for each device by extracting the latest timestamp, updating the last_seen state, and managing timers. It cancels existing ones and registering a new 30-second expiry timer to detect future inactivity. Because alerts are only emitted upon timer expiration, the method yields an empty dataframe during normal heartbeat processing.

2.3 Handle timer expiration and emit alerts

The handleExpiredTimer() method is called when a registered timer fires. This is where we detect offline devices and emit alerts.

def handleExpiredTimer(
    self, key: tuple, timerValues, expiredTimerInfo
) -> Iterator[pd.DataFrame]:
    device_id = key[0]
    current_time_ms = timerValues.getCurrentProcessingTimeInMs()

    # Verify state exists
    if not self.last_seen.exists():
        yield pd.DataFrame()
        return

    # Get last seen timestamp from state
    last_seen_state = self.last_seen.get()
    last_seen_timestamp = last_seen_state[0]

    if last_seen_timestamp is None or pd.isna(last_seen_timestamp):
        yield pd.DataFrame()
        return

    # Calculate how long device has been offline
    last_seen_ms = int(last_seen_timestamp.timestamp() * 1000)
    offline_duration_ms = current_time_ms - last_seen_ms
    offline_duration_seconds = offline_duration_ms / 1000.0

    # Create alert as a Pandas DataFrame
    alert_df = pd.DataFrame({
        "device_id": [device_id],
        "alert_type": ["DEVICE_OFFLINE"],
        "last_seen": [last_seen_timestamp],
        "offline_duration_seconds": [offline_duration_seconds],
        "alert_timestamp": [datetime.fromtimestamp(current_time_ms / 1000.0)]
    })

    # Register another timer for repeat alerts (every 60 seconds)
    next_alert_time = current_time_ms + ALERT_REPEAT_INTERVAL_MS
    self.handle.registerTimer(next_alert_time)

    yield alert_df  # Emit the alert

The handleExpiredTimer() method is triggered automatically when a device’s inactivity timer expires, retrieving the last_seen state to calculate the offline duration and yielding an alert dataframe to the output stream. It also registers a follow-up timer for repeat alerts every 60 seconds, which continues until a new heartbeat arrives and cancels the timer via handleInputRows().

There are several ways you could extend this solution for production use. You could implement exponential backoff for repeat alerts to reduce noise, for example, alerting after 60 seconds, then 2 minutes, then 5 minutes, and so on. Other improvements could include adding severity escalation based on offline duration, integrating with notification services like Amazon SNS for downstream alerting, or setting a maximum retry limit to stop alerts for permanently decommissioned devices.

2.4 Apply transformWithState to the streaming DataFrame

Now we connect everything together by applying our HeartbeatMonitor processor to the streaming data.

# Read and parse heartbeat events from Kinesis
parsed_df = kinesis_df \
    .selectExpr("CAST(data AS STRING) as json_data") \
    .select(from_json(col("json_data"), heartbeat_schema).alias("heartbeat")) \
    .select(
        col("heartbeat.device_id"),
        to_timestamp(col("heartbeat.timestamp")).alias("timestamp"),
        col("heartbeat.battery_level"),
        col("heartbeat.signal_strength"),
        col("heartbeat.firmware_version")
    )

# Apply transformWithState for stateful processing
alerts_df = parsed_df \
    .groupBy("device_id") \
    .transformWithStateInPandas(
        statefulProcessor=HeartbeatMonitor(),
        outputStructType=alert_output_schema,
        outputMode="append",
        timeMode="processingTime"
    )

# Write alerts to SNS
query = alerts_df.writeStream \
    .outputMode("append") \
    .foreachBatch(send_to_sns) \
    .option("checkpointLocation", CHECKPOINT_LOCATION) \
    .trigger(processingTime="10 seconds") \
    .start()

# Send to SNS for alerts
def send_to_sns(batch_df, batch_id):
    if batch_df.count() > 0:
        sns_client = boto3.client('sns', region_name=KINESIS_REGION)
        for row in batch_df.collect():
            message = {
                "device_id": row["device_id"],
                "alert_type": row["alert_type"],
                "last_seen": str(row["last_seen"]),
                "offline_duration_seconds": row["offline_duration_seconds"],
                "alert_timestamp": str(row["alert_timestamp"])
            }
            sns_client.publish(
                TopicArn=SNS_TOPIC_ARN,
                Message=json.dumps(message),
                Subject=f"Device Offline Alert: {row['device_id']}"
            )

The streaming pipeline parses JSON heartbeat events from Kinesis, partitions them by device_id, and applies the HeartbeatMonitor stateful processor using transformWithStateInPandas() with processing-time timers and append output mode. The resulting alert stream is written to SNS via foreachBatch() with checkpointing enabled for fault tolerance and micro-batches triggered every 10 seconds.

To summarize, implementing the heartbeat monitor requires just three methods. The init() method sets up your state variables, handleInputRows() processes incoming heartbeats and manages timers, and handleExpiredTimer() generates offline alerts. The transformWithState API handles the underlying complexity of state management, checkpointing, and timer scheduling automatically.

Step 3: Create IAM role for job execution

Create an IAM role that allows EMR Serverless to assume it for running your Spark job. For detailed instructions on creating an IAM role, see Creating an IAM role. Use the following trust policy for the role.

{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Principal": {
      "Service": "emr-serverless.amazonaws.com"
    },
    "Action": "sts:AssumeRole"
  }]
}

Attach a permissions policy that grants the role access to read from the Kinesis stream, write to the S3 bucket for checkpoints and application artifacts, and publish alerts to the SNS topic:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "KinesisAccess",
      "Effect": "Allow",
      "Action": [
        "kinesis:GetRecords",
        "kinesis:GetShardIterator",
        "kinesis:DescribeStream",
        "kinesis:DescribeStreamSummary",
        "kinesis:ListShards",
        "kinesis:SubscribeToShard"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:*:stream/iot-heartbeats"
    },
    {
      "Sid": "SNSPublish",
      "Effect": "Allow",
      "Action": "sns:Publish",
      "Resource": "arn:aws:sns:us-east-1:*:iot-alerts"
    },
    {
      "Sid": "S3Access",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::your-bucket",
        "arn:aws:s3:::your-bucket/*"
      ]
    }
  ]
}

Step 4: Upload external dependencies required for executing the streaming job

In this step, you will download the required external dependencies and upload them to your S3 bucket to make them available for your EMR Serverless streaming job.

  • Spark-kinesis-connector.jar (download link) and copy to local S3 bucket s3://your-bucket/jars/spark-kinesis-connector.jar.
  • Protobuf Dependency (download link) and copy to local S3 bucket s3://your-bucket/pyfiles/protobuf_pkg.tar.gz.

Step 5: Submit the streaming job

Now that the application, IAM role, and dependencies are in place, you can submit the streaming job. This step configures the Spark job parameters and submits it to your EMR Serverless application in streaming mode. For more details on submitting jobs, see Starting a job run.

First, create a file named job-driver.json with the following content. Replace the S3 paths with the locations where you uploaded your script and dependencies in the previous steps.

{
  "sparkSubmit": {
    "entryPoint": "s3://your-bucket/scripts/heartbeat_monitor.py",
    "sparkSubmitParameters": "--jars s3://your-bucket/jars/spark-kinesis-connector.jar --archives s3://your-bucket/pyfiles/protobuf_pkg.tar.gz#protobuf_pkg --conf spark.executor.cores=4 --conf spark.executor.memory=16g --conf spark.driver.cores=4 --conf spark.driver.memory=16g --conf spark.executor.instances=3 --conf spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider --conf spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled=true --conf spark.emr-serverless.driverEnv.PYTHONPATH=./protobuf_pkg --conf spark.executorEnv.PYTHONPATH=./protobuf_pkg"
  }
}

Then, run the following command to submit the job. Replace the application ID and account ID with your own values.

aws emr-serverless start-job-run \
  --application-id <YOUR_APPLICATION_ID> \
  --execution-role-arn arn:aws:iam::<ACCOUNT_ID>:role/EMRServerlessJobRole \
  --job-driver file://job-driver.json \
  --mode STREAMING \
  --retry-policy maxFailedAttemptsPerHour=1 \
  --region us-east-1

Running transformWithState on Amazon EMR Serverless provides several operational advantages over self-managed Spark clusters. In streaming mode, the Spark driver remains alive between micro-batches, eliminating the overhead of repeatedly starting and stopping the application. You don’t need to provision or manage executors because EMR Serverless automatically scales compute resources up and down based on workload demands, so you only pay for what you use. Your IoT heartbeat monitor can handle traffic spikes, such as thousands of devices reconnecting simultaneously after a network outage, without manual intervention. EMR Serverless also provides built-in job resiliency, real-time monitoring, and enhanced log management, reducing the operational burden of running streaming applications in production.

Testing the solution

Now that our streaming application is deployed, let’s test it by sending heartbeat events and observing the offline detection behavior.

Step 1: Open AWS CloudShell

Open AWS CloudShell in your AWS account from the AWS Management Console.

Step 2: Send heartbeat events using CLI

Execute the following bash script to send heartbeat events every 10s.

#!/bin/bash

while true; do
  aws kinesis put-record \
    --stream-name iot-heartbeats \
    --partition-key device-001 \
    --data $(echo "{\"device_id\":\"device-001\",\"timestamp\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\",\"battery_level\":87.5,\"signal_strength\":-42.3,\"firmware_version\":\"v2.1.0\"}" | base64) \
    --region us-east-1

  aws kinesis put-record \
    --stream-name iot-heartbeats \
    --partition-key device-002 \
    --data $(echo "{\"device_id\":\"device-002\",\"timestamp\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\",\"battery_level\":87.5,\"signal_strength\":-42.3,\"firmware_version\":\"v2.1.0\"}" | base64) \
    --region us-east-1

  aws kinesis put-record \
    --stream-name iot-heartbeats \
    --partition-key device-003 \
    --data $(echo "{\"device_id\":\"device-003\",\"timestamp\":\"$(date -u +%Y-%m-%dT%H:%M:%SZ)\",\"battery_level\":87.5,\"signal_strength\":-42.3,\"firmware_version\":\"v2.1.0\"}" | base64) \
    --region us-east-1

  sleep 10
done

Update the timestamp field to use the current time for each event or use a script to automate sending events at regular intervals.

Step 3: Observe normal operation

As you send heartbeat events every 10 seconds, the Spark application receives each event and updates the device’s state. A timer is then registered for 30 seconds in the future. Each new heartbeat cancels the existing timer and registers a new one, effectively resetting the countdown. As long as heartbeats continue to arrive within the 30-second window, no alerts are sent.

Timeline diagram showing normal device operation over 60 seconds with heartbeats arriving every 10 seconds, each resetting the 30-second timer

The above timeline diagram shows a 60-second window of normal device operation. Heartbeat events arrive every 10 seconds (at 0s, 10s, 20s, 30s, 40s, 50s, and 60s), each resetting the 30-second timer window. Because every heartbeat arrives well within the 30-second threshold, the timer never expires, the device state remains online, and no alerts are triggered.

Step 4: Test offline detection

Stop sending heartbeat events for the device and wait 30 seconds. You should receive an SNS alert indicating the device is offline.

Timeline diagram showing offline detection over 110 seconds with the 30-second timer expiring and triggering SNS alerts

Timeline diagram showing offline detection over 110 seconds. Device sends heartbeats at 0s, 10s, and 20s before going offline. The 30-second timer expires at 50s triggering Alert #1 via SNS, followed by a repeat Alert #2 at 110s after a 60-second repeat timer.

If you continue to not send heartbeats, additional alerts will be sent every 60 seconds.

Step 5: Test device recovery

Resume sending heartbeat events using the same CLI command. The application will cancel all existing timers for the device and will stop sending SNS alerts.

Timeline diagram showing device recovery lifecycle with timers canceled and device returning to online state

Timeline diagram showing the complete device recovery lifecycle over 140 seconds across three phases: normal operation with heartbeats, offline detection with SNS alerts, and recovery where timers are canceled and the device returns to online state

Clean up

To avoid incurring ongoing charges, follow these steps to clean up the resources.

Step 1: Stop the EMR serverless application

Stop your running streaming job:

aws emr-serverless stop-job-run \
  --application-id <your-application-id> \
  --job-run-id <your-job-run-id>

Step 2: Delete the EMR serverless application

aws emr-serverless delete-application \
  --application-id <your-application-id>

Step 3: Delete kinesis data stream

aws kinesis delete-stream --stream-name iot-heartbeat

Step 4: Remove S3 objects

Delete the checkpoint data, scripts, and dependencies from your S3 bucket:

aws s3 rm s3://your-bucket/checkpoints/ --recursive
aws s3 rm s3://your-bucket/scripts/ --recursive
aws s3 rm s3://your-bucket/jars/ --recursive
aws s3 rm s3://your-bucket/pyfiles/ --recursive

Real-world use cases for stateful streaming

The transformWithState API enables developers to build sophisticated streaming applications that were previously difficult to implement. Here are a few examples of how it can be applied across industries.

Telecommunications and network monitoring: Telecom providers need to detect network anomalies and SLA violations as they happen across millions of concurrent sessions. With transformWithState, developers can maintain per-session state to track call detail records, compare real-time network metrics against established baselines, and trigger alerts the moment thresholds are breached. Automatic state TTL ensures that completed session records are cleaned up without manual intervention.

Financial services and fraud detection: Detecting fraud requires correlating multiple signals across a sequence of transactions in real time. With transformWithState, developers can maintain per-account state that tracks transaction histories, flags suspicious patterns like rapid purchases across geographies, and calculates rolling risk scores. Multiple state variables per key allow tracking different dimensions of activity, such as transaction velocity, location changes, and spending deviations, within a single stateful operator.

E-commerce and customer engagement: Understanding customer behavior in real time is critical for driving conversions. Using transformWithState, developers can build session-aware applications that track browsing and cart activity with timer-based state expiration, detecting cart abandonment after a configurable timeout and triggering personalized re-engagement notifications. The State Data Source Reader enables teams to inspect session state mid-stream, making it easier to debug and validate real-time customer journey logic.

Conclusion

Apache Spark 4.0’s transformWithState API represents a significant advancement in stateful stream processing, making it simpler to build complex real-time applications like IoT device monitoring. Combined with Amazon EMR Serverless, you get a fully managed platform that scales automatically and eliminates infrastructure management overhead.

This post demonstrates how to use the native timer support capability of transformWithState to build a real-time IoT device monitoring application. We encourage you to explore other capabilities such as Automatic State TTL, Schema Evolution, and Multiple State Variables on Amazon EMR Serverless to build more sophisticated streaming applications tailored to your needs.


About the authors

Raj Ramasubbu

Raj Ramasubbu

Raj Ramasubbu is a Senior Specialist Solutions Architect for Analytics and AI at AWS. He partners with ISV customers to design and implement modern data platforms that balance performance, cost efficiency, and operational resilience at scale. With over two decades of experience spanning data engineering, advanced analytics, and machine learning across industries such as healthcare, financial services, and retail, Raj brings a practitioner’s perspective to solving complex data challenges in the cloud.

Rekha Veeraraghavan

Rekha Veeraraghavan

Rekha Veeraraghavan is a Technical Account Manager at Amazon Web Services (AWS). She serves as a Subject Matter Expert in AWS Analytics services, specializing in AWS Glue and Amazon Athena. Rekha provides expert guidance and technical support to enterprise and strategic customers, helping them optimize data analytics solutions. With deep expertise in data engineering, she enables organizations to build scalable, efficient, and cost-effective data processing pipelines on AWS.

Praveen Krishnamoorthy Ravikumar

Praveen Krishnamoorthy Ravikumar

Praveen Krishnamoorthy Ravikumar is an Analytics Specialist Solutions Architect at AWS. He helps customers design and implement modern data and analytics platforms that leverage the scalability, flexibility, and innovation of the cloud. He is passionate about solving complex data challenges and enabling organizations to unlock actionable insights from their data.

Announcing general availability of Apache Spark 4.0 on Amazon EMR

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/announcing-general-availability-of-apache-spark-4-0-on-amazon-emr/

As data volumes grow and pipelines become more complex, you need an engine that handles semi-structured data natively, supports streaming state without operational overhead, and allows you to develop interactively against production-scale compute. Spark 4.0 addresses these three challenges that slow modern data teams: wrangling semi-structured data, managing streaming state, and bridging the gap between interactive development and production-scale execution. With VARIANT data type, state-management improvements, and Spark Connect availability in Spark 4.0, you can now handle these workloads with less code, fewer operational trade-offs, and faster iteration cycles, all on Amazon EMR optimized runtime, which runs Spark workloads up to 4.5× faster than open-source Apache Spark.

With this general availability announcement, Spark 4.0 is now supported across Amazon EMR Serverless, Amazon EMR on EC2, and Amazon EMR on EKS deployment options. In this post, you’ll learn about key Spark 4.0 capabilities now available on Amazon EMR including Spark Connect, the Variant data type, SQL scripting, Python API improvements, and streaming enhancements, along with infrastructure changes in the new emr-spark-8.0 release.

New features in GA

Apache Spark 4.0 introduces several capabilities that are now generally available on Amazon EMR.

Spark Connect

Most Spark development is iterative and disconnected from production. You write code locally, test it against a sample, then package and deploy it to a cluster. It often fails due to data issues at scale, environment mismatches, or dependency conflicts. The feedback loop is slow, and the gap between development and production is where most time is lost.

Spark Connect closes that gap by introducing a decoupled client-server architecture that changes how your application communicates with Spark. In previous versions, your application code and the Spark driver ran inside the same JVM process, meaning issues in your application code could destabilize the Spark driver and disrupt the entire session. Your application runs as a lightweight client that submits logical plans to a Spark server over gRPC. The server handles execution independently. Your client doesn’t require a local Spark installation, a JVM, and doesn’t need to run on a cluster node. It only needs connectivity to the server endpoint.

With Amazon EMR, this means you can write PySpark from your preferred IDE (VS Code, PyCharm), Jupyter notebooks, Amazon SageMaker Unified Studio Data Notebooks, Amazon Q Developer, or Kiro, and Spark Connect routes your DataFrame transformations and SQL queries to Amazon EMR for execution over a secure connection.You can set breakpoints, inspect variables, and step through transformations while your data is processed on serverless compute, catching issues during development instead of after deployment. There are no clusters to provision, no code to repackage, and no infrastructure to manage.

This architecture also improves session resilience. A client-side failure doesn’t affect the Spark server, so other workloads continue to run without disruption. Spark Connect is an open Apache Spark standard. The same PySpark code works across different Spark backends by changing the connection endpoint.

For example, connecting to Amazon EMR Serverless from your local IDE takes minimal lines of spark code:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .remote("sc://<endpoint>:443/;use_ssl=true;x-aws-proxy-auth=") \
    .getOrCreate()
df = spark.sql("SELECT * FROM my_catalog.my_database.my_table")
df.groupBy("category").count().show()

On Amazon EMR Serverless, start a session to retrieve your endpoint and auth token, then connect remotely using the standard sc:// protocol. Every Spark operation executes on Amazon EMR Serverless while your code stays local.

The following video showcases Spark Connect and Variant features together.

For a step-by-step getting-started walkthrough, visit Announcing Spark Connect on Amazon EMR Serverless: Interactive PySpark development, anywhere.

Data type and table format enhancements

This section covers the VARIANT data type and Apache Iceberg V3 support. These two additions improve how you store and query semi-structured data.

Apache Iceberg V3 support

Amazon EMR has supported Apache Iceberg V3 since Amazon EMR release 7.x, introducing capabilities such as deletion vectors and row lineage. With Spark 4.0 on Amazon EMR, that support deepens unlocking capabilities that had a hard dependency on Spark 4.0 itself, including VARIANT column storage and unknown type handling. For teams running data lakehouse workloads, the table format underneath your data determines how efficiently it is stored, how reliably it evolves, and how safely multiple tools can read and write it simultaneously.

What this means for your workloads:

  • VARIANT and Iceberg working together: VARIANT columns can now be stored natively in Iceberg V3 tables, combining efficient semi-structured data storage with Iceberg’s schema evolution and time travel capabilities. This eliminates the pipeline complexity of upfront schema definitions.
  • More efficient partitioning: Multi-argument transforms accept multiple input columns in a single partition expression, such as range (order_date, product_category), giving you finer control over data layout. They produce a single composite key instead of separate columns whose cartesian product can explode partition count. The result is less data scanned, faster queries, and lower compute costs for high volume workloads.
  • Safer schema evolution: Unknown type handling ensures that older readers do not break when newer writers introduce new column types, reducing coordination overhead across teams and tools during upgrades.
  • Fine-grained access control (FGAC): Column-level and row-level permissions are now available through AWS Lake Formation, giving you governed access control at a granular level across your Iceberg tables, no custom access logic required.

Variant data type

The new VARIANT data type, supported through Apache Iceberg v3, brings native support for semi-structured JSON data directly into Spark SQL. This matters most when you don’t control the data being written because platform teams and shared services often receive data from partners and upstream teams with unpredictable or evolving structures.

Without VARIANT, handling semi-structured data meant accepting real tradeoffs: defining schemas upfront that broke when data evolved, storing everything as strings with heavy parsing costs on every read, or building wide tables with nullable columns that wasted storage on empty fields. The most realistic option was breaking nested structures apart into separate columns before running queries. This ETL step added latency, increased storage costs, and broke every time an upstream team added or removed fields from their data feed.

VARIANT eliminates the process entirely. Data stays nested and is queryable with variant_get(), without a separate ETL pipeline. You ingest without defining a schema first and apply structure at query time.

For example, querying nested fields is now a single expression:

SELECT
    variant_get(payload, '$.user.name') AS user_name,
    variant_get(payload, '$.event.type') AS event_type,
    variant_get(payload, '$.event.timestamp') AS event_timestamp
FROM VALUES
    (PARSE_JSON('{"user":{"name":"Alice"},"event":{"type":"click","timestamp":"2025-03-01"}}'))
AS t(payload)
WHERE variant_get(payload, '$.event.timestamp') > '2025-01-01';

For a deep dive into how VARIANT is stored in Parquet, shredding mechanics, and a full end-to-end walkthrough on Amazon EMR Serverless, see Beyond JSON blobs: Implementing the VARIANT data type in Apache Iceberg V3.

Key benefits for your workloads:

  • Reduced pipeline fragility: Schema changes no longer break ingestion. Data lands as-is, and you apply structure at query time based on what each analysis needs, without upstream coordination.
  • Improved query performance: Optimized storage format enables efficient access to nested fields without parsing overhead, so queries run faster even on deeply nested payloads.
  • Better storage efficiency: Compact encoding eliminates the waste of NULL-heavy wide tables, reducing storage costs for semi-structured data at scale.

VARIANT is especially well-suited where schemas are unpredictable or evolving: IoT and sensor data with device-specific payloads, logging and telemetry with variable event structures, and API responses and webhooks from third-party services where the schema changes without notice.

SQL enhancements

You can now write and maintain Spark pipelines using the same standard SQL you already know, no Spark-specific functions or syntax required. Apache Spark 4.0 expands ANSI SQL compliance so that functions behave consistently, opening Spark to anyone who can write SQL rather than requiring Spark specialists.

Standard SQL syntax such as OFFSET, LIMIT ... OFFSET, and lateral column aliases now work as expected. For example:

-- Standard OFFSET syntax now supported
SELECT id, name
FROM VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol'), (4, 'Dave') AS t(id, name)
ORDER BY id
LIMIT 2 OFFSET 1;

-- Lateral column aliases work inline
SELECT amount * 1.1 AS adjusted, adjusted * 0.08 AS tax
FROM VALUES (100.0), (200.0), (350.0) AS t(amount);

Beyond syntax, SQL scripting brings procedural logic directly into Spark SQL. You can now use variables, IF/ELSE conditionals, loops, and multi-statement blocks without switching to Python or JVM-based languages. Before SQL scripting, multi-step workflows (such as ETL logic with conditional branching or iterative data quality checks) required wrapping SQL statements in Python or Scala to handle control flow. SQL scripting removes that dependency. SQL-native teams can author and maintain these workflows entirely in SQL.

Key benefits:

  • Simplified ETL workflows: Multi-step transformation logic that previously required an external language can now live entirely in SQL, reducing code complexity and making pipelines easier to build and maintain.
  • Lower barrier for SQL-native teams: Teams that primarily work in SQL no longer need to context-switch into Python or Scala to implement conditional logic or iterative processing. The entire pipeline stays in SQL.

Python advances

Earlier versions of Spark required Python users to step outside Python at two key points: building custom data connectors required Java or Scala, and diagnosing UDF performance lacked built-in visibility. Spark 4.0 addresses both directly, removing the two biggest blockers for organizations where Python is the primary language.

Python Data Source API

With the Python Data Source API, you can build custom, reusable data connectors in Python without any JVM or Scala code. Custom connectors participate in Spark’s query optimization, including predicate pushdown and schema inference. This matters when your data system only has a Python client, or when your team does not have Java or Scala expertise: you can now wrap any custom format or external source as a Spark DataFrame source or sink without leaving Python.

Spark 4.0 also introduces polymorphic Python UDTFs (User-Defined Table Functions) that can return different schema shapes depending on input, with an analyze() method that produces a schema dynamically based on parameters. This is particularly useful for processing varying JSON schemas or splitting inputs into a variable set of outputs.

If you’re ingesting data from a REST API with a Python client, you can implement a custom Spark data source entirely in Python, register it, and use it directly in Spark SQL or the DataFrame API. What previously required a Scala developer and a custom JVM connector can now be built and maintained by your Python team running the pipeline.

Python UDF enhancement

Python UDF profiling provides built-in visibility into execution time, serialization overhead, and memory usage at the individual UDF level without external tooling. Additionally, it enables performance or memory profiling depending on what you need to diagnose.

Arrow-based vectorized UDF support reduces serialization overhead between Python and the JVM using a columnar format, replacing row-at-a-time processing with batch-oriented columnar exchange.

Together, these give you a complete optimization loop: build custom connectors in Python, profile your UDF performance, and optimize with confidence.

Practical benefits for Python teams:

  • Lower barrier for Python teams: Custom data connectors no longer require Java or Scala knowledge. If your data system has a Python client, you can build a production-grade Spark connector entirely in Python.
  • Flexible data transformation: Polymorphic UDTFs let your functions adapt to varying input schemas dynamically, reducing the need to write and maintain multiple transformation functions for different data shapes.
  • Faster UDF optimization: Built-in profiling surfaces exactly where execution time and memory are being spent at the UDF level, replacing guesswork with direct visibility and making performance tuning significantly faster.

Streaming enhancements

This section covers improvements to state management and observability in structured streaming workloads.

Queryable state for structured streaming

Structured streaming jobs maintain state continuously (running totals, session windows, aggregated counts). However, in earlier versions of Spark that state was locked inside the running job. Inspecting it meant stopping the stream or manually parsing checkpoint files. For production workloads, this created real operational risk: teams had no way to verify whether state was correct, corrupted, or drifting without taking the job down.

Time-sensitive applications faced an additional problem: timers in Spark streaming only fired when new data arrived, so a five-minute heartbeat timeout could silently miss its window if no data came in, making applications like heartbeat monitoring and session tracking unreliable by design.

Spark 4.0 changes this. The new transformWithState API provides deterministic timer execution because timers fire on schedule regardless of data arrival patterns. It also delivers automatic state TTL to prevent unbounded growth, schema evolution without restarting from a new checkpoint, and state observability for mid-stream debugging. External systems can now read live aggregated state from a running streaming job without interrupting it. State is accessible as a DataFrame, queryable during development, verifiable in unit tests, and inspectable during production incidents without touching the running stream.

This is backed by three improvements working together. First, the transformWithState operator replaces mapGroupsWithState from earlier Spark versions (which had limited timer support and no TTL-based cleanup). Second, the state data source reader exposes streaming state as a queryable DataFrame. Lastly, RocksDB changelog checkpointing improvements address throughput bottlenecks in high-volume stateful workloads.

Consider a fleet of 100,000 IoT sensors across manufacturing facilities, each requiring an alert within 30 seconds of going offline. The sensors track heartbeat state per device, managing independent timers, handling late data, and cleaning up decommissioned devices at scale had no clean solution in earlier Spark versions. The transformWithState operator handles all of this natively, and queryable state lets your operations team inspect live device state in real time without stopping the stream:

# Timers fire on schedule regardless of data arrival, making heartbeat monitoring reliable
alerts = events_df.groupBy("device_id").transformWithState(
    HeartbeatMonitor(),
    outputStructType=StructType([
        StructField("device_id", StringType()),
        StructField("alert", StringType())
    ]),
    outputMode="Append"
)

Combined with Amazon EMR Serverless, which scales compute automatically based on workload demands, you can deploy stateful streaming pipelines without managing clusters or predicting capacity.

Benefits:

  • Real-time operational visibility: Live streaming state is now accessible externally without interrupting the job, powering dashboards and monitoring systems that reflect current aggregations.
  • Faster debugging: State values can be queried directly as a DataFrame, making it significantly easier to diagnose production incidents and verify correctness during development.
  • Better performance at scale: RocksDB checkpointing improvements reduce bottlenecks in high-throughput stateful workloads, improving reliability for long-running streaming jobs.

What’s new in the emr-spark-8.0 release

Beyond the Spark 4.0 capabilities covered in the preceding sections, the emr-spark-8.0 release introduces infrastructure and runtime changes that simplify how you deploy, patch, and manage Amazon EMR workloads. The release focuses exclusively on Spark, reducing the surface area you need to patch and test.

Fewer components to patch and test

The emr-spark-8.0 release includes Apache Spark 4.0, Apache Iceberg 1.10, Apache Hudi 1.0.2, Delta Lake 4.0, and connectors for Amazon DynamoDB, Amazon Kinesis, Amazon Redshift, and Amazon Simple Storage Service (Amazon S3) (via the S3A connector). Apache Livy and JupyterEnterpriseGateway are available as opt-in components on Amazon EMR on EC2. If your workloads require Apache Flink, Trino, Presto, or other execution engines, you can continue to use Amazon EMR 7.x releases.

Simplified patch management

You can specify emr-spark-8.0.x when creating a cluster or application, and Amazon EMR will automatically select the latest patch version. For example, emr-spark-8.0.1, emr-spark-8.0.2, and so on as patches are released. This “.x” wildcard is supported through AWS APIs and AWS Command Line Interface (AWS CLI). On Amazon EMR on EKS and Amazon EMR Serverless, new jobs automatically run on the latest Amazon Linux patches, so you no longer need to track date-based version tags.

Latest Python, Java, and Scala runtimes

The release ships with modernized runtimes: Python 3.11 as the default, with support for Python 3.12 and 3.13. Java 17 is the default, with Java 21 also available. Both are provided through Amazon Corretto. Scala 2.13 is the supported Scala runtime.

A few infrastructure changes to note: AWS SDK for Java v2 replaces v1, bringing improved performance and alignment with the latest AWS APIs. The EMR S3A connector replaces EMR File Systems (EMRFS) for Amazon S3 access, delivering better performance and compatibility with open-source Spark. For shuffle-intensive workloads on Amazon EMR Serverless, enabling Serverless Storage can reduce data processing costs by up to 20%. For more information, see Optimize Amazon EMR Runtime for Apache Spark with EMR S3A for benchmarks, Amazon EMR Serverless eliminates local storage provisioning, and Reducing costs for shuffle-heavy Apache Spark workloads with serverless storage for Amazon EMR Serverless.

Migration and compatibility notes

If you are migrating from Spark 3.5 to Spark 4.0, the Apache Spark upgrade agent for Amazon EMR can accelerate your migration by analyzing existing applications and identifying changes needed for Spark 4.0 compatibility. For more information, see the upgrade guidance.

If your workflows use Apache Pig, Apache Oozie, JupyterHub, Apache Zeppelin, or Hue, you can continue to use Amazon EMR 7.x releases. These components are not included in emr-spark-8.0. For interactive Spark development, use Amazon EMR Studio, with Apache Livy and JupyterEnterpriseGateway available on Amazon EMR on EC2.

For the complete list of supported components and configurations, see the Amazon EMR release guide.

Get started

Spark 4.0 is now available across Amazon EMR on EC2, Amazon EMR on EKS, and Amazon EMR Serverless. To begin, choose your deployment model and follow the relevant getting started guide:

Conclusion

Spark 4.0 on Amazon EMR delivers improvements across query validation, semi-structured data handling, Python development, and streaming observability. ANSI SQL mode catches invalid operations at query time rather than silently propagating nulls downstream, and SQL scripting removes the need to context-switch between SQL and Python for complex ETL logic. The VARIANT data type eliminates parsing overhead for semi-structured JSON workloads and can now be stored natively in Iceberg V3 tables with fine-grained access control at the column and row level. Queryable streaming state gives you live visibility into running jobs without interruption, and Spark Connect lets you develop against Amazon EMR from Jupyter notebooks, Amazon SageMaker Unified Studio Data Notebooks, Amazon Q Developer, Kiro, or your preferred IDE without managing cluster connectivity.

Ready to build or migrate? Choose your deployment model from the preceding section and get started today. For detailed guidance, see the Amazon EMR Release Guide and the Amazon EMR Serverless User Guide.


About the authors

Suthan Phillips

Suthan Phillips

Suthan is a Senior Analytics Architect at AWS, where he helps customers design and optimize scalable, high-performance data solutions that drive business insights. He combines architectural guidance on system design and scalability with best practices to provide efficient, secure implementation across data processing and experience layers. Outside of work, Suthan enjoys swimming, hiking, and exploring the Pacific Northwest.

Karthik Prabhakar

Karthik Prabhakar

Karthik is a Data Processing Engines Architect for Amazon EMR at AWS. He specializes in distributed systems architecture and query optimization, working with customers to solve complex performance challenges in large-scale data processing workloads. His focus spans engine internals, cost optimization strategies, and architectural patterns that enable customers to run petabyte-scale analytics efficiently.

Kiki Nwangwu

Kiki Nwangwu

Kiki is an Analytics and GenAI Specialist Solutions Architect at AWS. She specializes in helping customers architect, build, and modernize scalable data analytics and generative AI solutions. She enjoys traveling and exploring new cultures. (edited)

AI MS

AI MS

Al is a product manager for Amazon EMR at Amazon Web Services.

GPS As a Key Distribution Platform

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2026/06/gps-as-a-key-distribution-platform.html

This is interesting:

The U.S. military has likely been quietly broadcasting codes for its global encryption network using public GPS for nearly 20 years, turning each satellite into a hidden “numbers station,” according to Steven Murdoch…

That means every device that uses GPS has been receiving hidden government information for years, and nobody outside the military knew it until now.

[…]

Murdoch discovered that this particular sentinel was transmitted by all 31 operational satellites within a window of a few hours on May 26, 2011, potentially heralding the activation of a new operational system. He confirmed that this timeline coincided with the rollout of the military’s Over-the-Air Distribution (OTAD) and the Over-the-Air Rekeying (OTAR) by cross-referencing declassified documents, including a 2015 presentation about the dates of the operation.

“There was a perfect match between the timeline and that presentation and the change points that were automatically identified from the data,” Murdoch said. “That was the smoking gun that made me think: This is what it’s for.”

These automated systems replaced the cumbersome manual distribution of cryptographic keying material, allowing military GPS receivers around the world to be rekeyed remotely through satellite broadcasts rather than through onsite procedures.

Asahi Linux warns users not to upgrade to macOS 27 beta

Post Syndicated from jzb original https://lwn.net/Articles/1077209/

The Asahi Linux project,
which brings Linux support to Apple Arm-based Macs, has warned
its users
not to upgrade to the macOS 27 “Golden Gate”
beta.

Apple has changed how the boot picker and Startup Disk applications
detect valid OS boot volumes. When using either from macOS 27, your
Asahi partition will not be visible! We believe this to be a bug, and
have filed a report (FB22994760).

If you have already upgraded to the beta and noticed that your
Asahi partition has disappeared, do not stress. Your Asahi partition
is still there, and you have not lost any data.

The Asahi Linux installer has been patched to prevent use with
macOS 27 for now, but any users already bitten by the change will
need to use macOS 26 to restore access to Asahi Linux.

[$] BPF loop verification with scalar evolution

Post Syndicated from daroc original https://lwn.net/Articles/1076121/

The BPF verifier has, in the course of wrestling with the difficult problem of
statically analyzing loops, grown special support for many kinds of loops over its
history, but its fundamental approach to simple for loops has not
changed.
When it encounters a loop, it evaluates it, iteration by iteration, until reaching
an exit condition — a process that can cause the verifier to mistakenly hit the
limit on the number of allowed instructions where a better implementation
would not.
Eduard Zingerman
spoke at the 2026

Linux Storage, Filesystem, Memory-Management, and BPF Summit

about his in-progress work on improving the verifier’s treatment of loops, especially nested
loops.

Rapid7 Gains Access To Anthropic’s Project Glasswing To Explore Frontier AI For Cybersecurity

Post Syndicated from Wade Woolwine original https://www.rapid7.com/blog/post/ai-rapid7-accesses-anthropics-project-glasswing-exploring-frontier-artificial-cybersecurity-intelligence

Wade Woolwine is Senior Director, Product Security at Rapid7.

Rapid7 is excited to join Anthropic’s Project Glasswing, which includes access to Claude Mythos Preview, giving our teams the opportunity to explore how frontier AI can support legitimate, internal defensive security workflows led by experienced security practitioners. Anthropic has now expanded Project Glasswing from its initial cohort to a broader group of organizations, underscoring how quickly this conversation is moving from model capability to industry readiness. 

This access comes at a critical moment for security operations. Attackers are moving faster, attack surfaces are expanding, and fragmented security data makes it harder for teams to correlate context and respond at scale. The industry is entering a period where powerful frontier AI models with advanced cyber capabilities require new operating norms, stronger safeguards, and better infrastructure for how vulnerabilities are verified, disclosed, fixed, and deployed.

Frontier AI will raise expectations for how quickly security teams can understand risk, make decisions, and prove that action has reduced exposure. Rapid7 has already been tracking what Project Glasswing means for security leaders: faster discovery is only part of the story, and the real test is how defenders handle everything that follows, from prioritization and remediation to validation, detection, and response. Rapid7’s involvement gives us another opportunity to help shape how advanced LLMs are evaluated and applied to real defensive security work.

The organizations best positioned to benefit from frontier AI will be those that pair advanced models with trusted security context, expert oversight, and mature operational workflows. That is the lens Rapid7 is bringing to our internal exploration of Claude Mythos Preview, and it reflects the same principle that guides our broader AI strategy: advanced technology delivers the most value when grounded in security expertise, operational context, and measurable outcomes.

Exploring Claude Mythos Preview inside Rapid7

In the first week of Rapid7’s access to Claude Mythos Preview , it has already given our researchers, security engineers, and analysts another way to explore how frontier AI can strengthen the security workflows we already rely on. Our use is internal and practitioner-led, with a focus on learning where these models can create defensive value, where human expertise remains essential, and where responsible guardrails are required.

Cybersecurity impact depends on more than model capability. A model may help identify a potential vulnerability and confirm exploitability, but reducing risk requires deeper operational work: understanding affected systems, mapping business context, prioritizing remediation, validating the fix, and ensuring detection coverage is in place. Anthropic’s latest Project Glasswing update reinforces that same shift: as AI makes discovery faster, the next challenge becomes helping the industry scale verification, disclosure, fixing, and deployment.

For more than 25 years, Rapid7 has helped organizations understand risk in real environments and take action against it. Access to Project Glasswing gives us another way to explore how LLMs can support that mission, while reinforcing the same principle that guides our broader AI strategy: advanced technology delivers the most value when grounded in security expertise, operational context, and measurable outcomes.

How Rapid7 is using Claude Mythos Preview internally

Our initial exploration is focused on internal defensive use cases that can help strengthen our product security, improve our research, and create better security outcomes overall. The goal is to understand how frontier AI can support highly specialized security work while helping us evaluate these capabilities with the discipline and caution they require.

In product security, we are exploring how Claude Mythos Preview can support assessment of our code and infrastructure, helping identify potential vulnerabilities, weaknesses, or risky patterns that traditional product security tools may miss. Used responsibly, this type of workflow can help engineering and product security teams reduce risk earlier in the development lifecycle.

We are also evaluating how frontier AI can support vulnerability validation and exploitation analysis in authorized environments. This includes exploring how models can help researchers reason across unfamiliar code, validate severity, build safe proof-of-concept exploit paths, and translate findings into practical remediation guidance.

Our work also includes zero-day research and frontier model evaluation. As models become more capable, security teams need a clear view of where they perform well, where they struggle, and how their outputs should be governed. Evaluating these models against vulnerability discovery and exploitation tasks helps Rapid7 understand their practical value, limitations, and safeguards.

We are also applying frontier AI to red-teaming, detection, and response research. As AI becomes more embedded in enterprise systems and security operations, it also needs to be tested adversarially. Frontier models can help practitioners explore attack paths, challenge assumptions, enrich investigations, reduce noise, and support faster decisions when paired with the right telemetry and human judgment.

Why frontier AI needs cybersecurity expertise

The industry conversation around frontier AI often starts with what models can find, especially as they become more capable at reasoning across large codebases and surfacing potential flaws. However, security teams reduce risk by knowing which findings matter, acting on them quickly, and proving that exposure has been reduced. As we’ve written before, the challenge is turning faster discovery into faster action, which requires teams to understand their environment well enough to apply emerging models with intent.

That is why expertise matters. AI can help accelerate parts of the workflow, but security impact comes from connecting discovery to validation, remediation, detection, and response. Without that connection, faster discovery can create more volume for teams that are already stretched. With the right context and operating model, it can help defenders move earlier and with more confidence.

This is the lens Rapid7 brings to Project Glasswing. Our teams are exploring these capabilities as practitioners who understand the real-world pressures customers face: incomplete asset visibility, fragmented ownership, growing vulnerability backlogs, expanding identity and cloud risk, and alert volumes that can outpace human-only workflows.

From frontier AI adoption to preemptive security

Rapid7’s broader strategy is focused on helping organizations move toward preemptive security, where exposure management, and detection and response work together to disrupt attackers before risk becomes impact. As AI accelerates both attacker activity and defender workflows, security teams need more than faster vulnerability discovery. They need rich contextual prioritization, trusted AI-driven decision making, and mitigations beyond patching so they can prioritize, validate, and respond at speed and scale.

The next phase of cybersecurity will require speed, scale, and consistency across the entire security lifecycle. The industry challenge is expanding from finding vulnerabilities to the harder operational work of verifying, disclosing, fixing, and deploying remediations. While vulnerability and alert volumes will increase, cyber resilience depends on what happens both before and after discovery. In a reality where vulnerabilities can be exploited or chained together quickly, teams need the ability to prioritize exposures that have real impact, investigate quickly with full context, and keep operating in the face of disruption.

Preemptive security also means mitigation must extend beyond patching. Timely patching at scale is not always practical, so security teams need the ability to intercept and disrupt exploit paths through virtual patching, controls management, and rapid response actions. That is why Rapid7 is approaching frontier AI through the lens of preemptive security. Our AI foundation is built around unified security data and shared operational context across exposures, assets, identities, behavior, and activity, and transparent AI decisions validated by experts and governed by policy-driven workflows.

Access to Claude Mythos Preview is another step in exploring how LLMs can help security teams move earlier, act faster, and build more resilient programs without losing the human expertise and accountability that effective security requires. Anthropic also unveiled Fable 5 today, its first publicly available Mythos-class model, which will only further underscore the importance of having an integrated, AI-ready security plan that can turn this new benchmark of visibility into meaningful security improvement.

Security updates for Tuesday

Post Syndicated from jzb original https://lwn.net/Articles/1077163/

Security updates have been issued by AlmaLinux (bind and libyang), Debian (keystone and openssl), Fedora (mingw-objfw, objfw, sentencepiece, and tailscale), Mageia (packagekit and suricata), Oracle (bind, bind9.16, go-toolset:ol8, ImageMagick, kernel, samba, and vim), SUSE (apache-commons-lang3, apache-commons-text, apache-commons- configuration2, apache-commons-cli, apache-commons-io, apache-commons-codec, avahi, busybox, chromedriver, chromium, csync2, firewalld, frr, gleam, helm, kernel-devel, keybase-client, libmozjs-140-0, libopenvswitch-3_7-0, libsoup, memcached, mutt, openjpeg2, ovmf, perl-HTML-Parser, perl-Net-CIDR-Set, perl-Protocol-HTTP2, postgresql-jdbc, postgresql17, python-CairoSVG, python-Flask, python-pip, python-pyOpenSSL, python-python-multipart, python-Twisted, python-urllib3, python-urllib3_1, python-uv, python311, rsync, tomcat, and tree-sitter), and Ubuntu (alsa-lib, cups, inetutils, isc-kea, jpeg-xl, libnet-cidr-lite-perl, netatalk, netty, nginx, node-shell-quote, php-twig, pillow, poppler, rsync, strongswan, systemd, and transmission).

Linux App Summit 2026 (Heise)

Post Syndicated from corbet original https://lwn.net/Articles/1077084/

Heise is carrying a
report from the Linux App Summit
, held in Berlin in May.

The slightly more than a dozen talks were symbolically framed
between the opening keynote by systemd creator Lennart Poettering
and the closing talk by Jorge Castro, initiator of the Universal
Blue project, from which the modern Linux systems Bluefin and
Bazzite emerged. Both Castro and Poettering call for a fundamental
rethink of how Linux operating systems are delivered but pursue
different approaches.

Exploring AI Integration in Zabbix with Gemini and WebMCP

Post Syndicated from Cesar Caceres original https://blog.zabbix.com/exploring-ai-integration-in-zabbix-with-gemini-and-webmcp/33050/

When I first started working with Zabbix in banking and telecommunications over a decade ago, the workflow was always the same: something breaks, an alert fires, you open the dashboard, you diagnose, you fix. Every step required a human sitting in front of a screen reading charts and making decisions.

Then AI came along, and I started asking a simple question. What if I could just talk to my infrastructure and get answers? That question led me down a path from Telegram bots to WhatsApp integrations, and then from chatbots with custom modules to a full mobile application on the Google Play Store.

Along the way, I discovered that the real challenge is not connecting AI to Zabbix – it is defining how they should communicate. That is where protocols like MCP and WebMCP come in, and why they matter for anyone working in infrastructure monitoring today.

Phase 1: Just let me ask a question

The first thing I wanted was simple – to ask about my infrastructure in natural language and get a useful answer. Not parse JSON, not read raw metrics, just ask.

My early integrations used Telegram and WhatsApp as the interface. The AI (initially custom modules, later Gemini) would receive a question like “What alerts do I have right now?”, query the Zabbix API, and respond in plain language. It worked, but it was limited – the AI could only answer what I had explicitly programmed it to answer.

Phase 2: MCP gives AI a standard way to talk to Zabbix

The Model Context Protocol (MCP) developed by Anthropic solves a fundamental problem – how do you give an AI model structured access to external tools and data sources without reinventing the wheel every time?

Before MCP, every AI-to-Zabbix integration was custom. You wrote a script, parsed the API response, and formatted it for the model. If you wanted to switch from one AI provider to another, you started over. MCP standardizes this. You build an MCP server once, and any compatible AI client (Claude Desktop, Gemini CLI, or others) can use it.

The Zabbix community has already embraced this. There are now multiple open source MCP servers for Zabbix available on GitHub. You can request things like:

  • “Show me all unacknowledged problems with severity High or above”
  • “Create a maintenance window for host db-01 for 2 hours”
  • “What changed in the last 24 hours?”

Best of all, you can do it all through natural language and through a standardized protocol.

In my own environment, I set up a WebMCP server that connects a FastAPI backend to the Zabbix API, exposing structured endpoints for hosts, alerts, and problems. The server runs 24/7 alongside my Zabbix instance on a dedicated Proxmox node.

With a simple query to the WebMCP server, I can retrieve the full list of monitored hosts, check active problems, view recent alerts with their severity levels, and get a usage summary – all through clean, structured JSON responses that any AI client can consume.

The WebMCP server exposes structured endpoints for health monitoring, usage tracking, and Zabbix data.
A live query to the WebMCP server returning real Zabbix alerts in structured JSON.

Phase 3: WebMCP becomes the interface

Looking ahead, WebMCP is a proposed browser standard (co-created by engineers at Google and Microsoft) that lets websites declare their capabilities as structured tools that AI agents can call directly in the browser.

Think about what this means for Zabbix. Today, the Zabbix frontend is a web application that humans navigate – click on hosts, drill into triggers, check graphs, acknowledge problems. An AI agent trying to use the Zabbix frontend would have to take screenshots, interpret the UI, and guess where to click slow, fragile, and expensive.

With WebMCP, the Zabbix frontend could declare: “Here is a tool called get_active_problems. It needs a severity filter. Call it and I will return structured results.” The AI agent calls the function, gets clean data, and acts on it. No screenshots, no DOM scraping, no guessing.

The key differences from traditional MCP:

  • WebMCP runs inside the browser tab, not on a separate server. No additional infrastructure to deploy.
  • It inherits the user’s existing session the same SSO, the same cookies, the same role-based permissions. No separate auth layer.
  • Tools are contextual on a problems page, the agent sees problem-related tools. On a host configuration page, it sees host tools.

Chrome 146 already ships WebMCP experimentally. Broader stable release in Chrome is expected by the end of 2026.
To explore this concept in practice, I set up a WebMCP server in my environment, connected to my Zabbix instance.

The server exposes Zabbix data through a browser-based interface, allowing agents to query hosts, alerts, and problems directly from the browser tab.

The server itself is monitored by Zabbix, so I can track its resource consumption and ensure it does not impact the rest of the infrastructure closing the loop between the tool and the platform it extends.

A WebMCP demo page displaying live Zabbix alerts fetched through the browser-based backend.
A large selection of dashboard widgets enable Zabbix users to create Windows dashboards for different use cases

Why this matters for mobile monitoring

Today, if you want AI-assisted Zabbix monitoring on your phone, you need a dedicated app that connects to the Zabbix API, handles authentication, processes data, and presents it through an AI layer. That is what I built. It works, but it requires significant development effort.

WebMCP opens a different path. Imagine opening the Zabbix frontend in your mobile browser and having an AI assistant that can interact with it natively – no app required, no separate server, just the browser and the protocol. The assistant inherits your Zabbix session, sees only what your user role permits, and can help you triage incidents, assign tasks, and generate reports all through the same web interface you already use.

We are not there yet. WebMCP is still in early preview, and the Zabbix frontend needs to implement the protocol. But the architectural direction is clear. The web is becoming agent-ready, and monitoring tools will benefit enormously from this shift.

The practical roadmap

If you work with Zabbix and want to start integrating AI today, here is how I see the progression:

  • Right now: Use MCP servers to connect AI assistants to the Zabbix API. The open-source options are mature, support Zabbix 7.x (and experimentally 8.0), and work with multiple AI clients. Start with read-only mode to explore safely.
  • Near term: Build purpose-specific integrations. Whether it is a mobile app, a chatbot, or a custom dashboard, the Zabbix API combined with models like Gemini or Claude can deliver real value AI-generated weekly reports, intelligent alert triage, natural language infrastructure queries.
  • Coming soon: Keep an eye on WebMCP. As it matures and browsers ship stable support, it will become the lowest-friction way to add AI capabilities to any web-based monitoring tool. The sites that become agent-ready first will have a compounding advantage.

Closing thoughts

The infrastructure monitoring world is at an inflection point. We have been watching dashboards and reading alerts for decades. The protocols are now emerging – MCP for backend integrations, WebMCP for browser-native interactions that will let our infrastructure genuinely talk back to us.

If you are still running Zabbix 7.0 or previous, this is the year to migrate. Older versions are losing support, and the newer API capabilities in 7.0+ are what make these AI integrations possible. Zabbix offers certification programs through Zabbix Academy, and their partner network can assist with migrations.

The post Exploring AI Integration in Zabbix with Gemini and WebMCP appeared first on Zabbix Blog.

The collective thoughts of the interwebz