Updated: Apr 3, 2019
In Spark 1.6, a new programming abstraction, called Structured APIs, was introduced. This is the preferred way of performing data processing for the majority of use cases. The Structured APIs were designed to enhance developers’ productivity with easy-to-use, intuitive, and expressive APIs.
The DataFrame concept was inspired by the Python pandas DataFrame; the main difference is that
a). A DataFrame in Spark can handle a large volume of data that is spread across many machines.
b). Catalyst optimizer, which does all the hard work behind the scenes to make your life easier and to speed up your data processing logic.
Introduction to DataFrames
A DataFrame is an immutable, distributed collection of data that is organized into rows, where each one consists a set of columns and each column has a name and an associated type.
Creating DataFrames by Reading Parquet Files
Parquet is one of the most popular open source columnar storage formats in the Hadoop ecosystem, and it was created at Twitter. Its popularity is because it is a self-describing data format and it stores data in a highly compact structure by leveraging compressions. The columnar storage format is designed to work well with a data analytics workload where only a small subset of the columns are used during the data analysis. Parquet stores the data of each column in a separate file; therefore, columns that are not needed in a data analysis wouldn’t have to be unnecessarily read in. It is quite flexible when it comes to supporting a complex data type with a nested structure.
Creating DataFrames by Reading ORC Files
Optimized Row Columnar (ORC) is another popular open source self-describing columnar storage format in the Hadoop ecosystem. It was created by a company called Cloudera as part of the initiative to massively speed up Hive. It is quite similar to Parquet in terms of efficiency and speed and was designed for analytics workloads.
Unlike the RDD operations, the structured operations [domain-specific language (DSL) for distributed data manipulation] are designed to be more relational, meaning these operations mirror the kind of expressions you can do with SQL, such as projection, filtering, transforming, joining, and so on.
Working with Columns
This is a syntactic sugar way of constructing a Column class in Scala.
Different way to specified columns in spark DateFrame:
Working with Structured Transformations
movies.select('movie_title,('produced_year - ('produced_year % 10)).as("produced_decade")).show(5)
movies.selectExpr("*","(produced_year - (produced_year % 10)) as decade").show(5)
movies.filter('produced_year >= 2000 && length('movie_title) < 5).show(5)
Introduction to Datasets
Starting with the Spark 2.0 release, there is only one high-level abstraction called a Dataset, which has two flavors: a strongly typed API and an untyped API. The term DataFrame didn’t go away; instead, it has been redefined as an alias for a collection of generic objects in a Dataset. From the code perspective, what I am saying is a DataFrame is essentially a type alias for Dataset[Row], where a Row is a generic untyped JVM object. A Dataset is defined as a collection of strongly typed JVM objects, represented by either a case class in Scala or a class in Java.
A Dataset is a strongly typed, immutable collection of data. Similar to a DataFrame, the data in a Dataset is mapped to a defined schema. However, there are a few important differences between a DataFrame and a Dataset.
1). Each row in a Dataset is represented by a user-defined object so that you can refer to an individual column as a member variable of that object. This provides you with compile-type safety.
2). A Dataset has helpers called encoders, which are smart and efficient encoding utilities that convert data inside each user-defined object into a compact binary format. This translates into a reduction of memory usage if and when a Dataset is cached in memory as well as a reduction in the number of bytes that Spark needs to transfer over a network during the shuffling process.
In terms of limitations, the Dataset APIs are available in only strongly typed languages such as Scala and Java.
A case class in the Scala language is like a JavaBean class in the Java language; however, it has a few built-in interesting properties. An instance of a case class is immutable, and therefore it is commonly used to model domain-specific objects. In addition, it is easy to reason about the internal states of the instances of a case class because they are immutable. The toString and equals methods are automatically generated to make it easier to print out the content of the case class and to compare different case class instances. Scala case classes work well with the pattern matching feature in Scala language.
RUNNING SQL IN SPARK
Spark provides a few different ways to run SQL in Spark.
1. Spark SQL CLI (./bin/spark-sql)
2. JDBC/ODBC server
3. Programmatically in Spark applications
This first two options provide an integration with Apache Hive to leverage the Hive metastore, which is a repository that contains the metadata and schema information about the various system and user-defined tables. This section will cover only the last option.
DataFrames and Datasets are essentially like tables in a database. Before you can issue SQL queries to manipulate them, you need to register them as temporary views. Each view has a name, and that is what is used as the table name in the select clause. Spark provides two levels of scoping for the temporary views.
- One is at the Spark session level. When a DataFrame is registered at this level, only the queries that are issued in the same session can refer to that DataFrame. The session-scoped level will disappear when a Spark session is closed.
- The second scoping level is at the global level, which means these views are available to SQL statements in all Spark sessions. All the registered views are maintained in the Spark metadata catalog that can be accessed through SparkSession.
SQL DataFrame Dataset
System errors Runtime Compile time Compile time
Analysis errors Runtime Runtime Compile time
.where('origin_state === "CA").orderBy('count.desc).show(5)
Multiple Aggregations per Group