Tendai

On anything that got to do with software development

Tag: Spark

Cannot cast exception while reading data from Hive in Spark

Introduction

In this post I am going to talk about an exception I got when I was trying to read data from Hive using Spark and how I managed to debug the issue and resolved it. I will also explain how one can reproduce the issue, by doing so, one can also avoid reproducing it. The exception was:

java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.hive.serde2.io.DateWritable

Spark code

Here is the snippet of the code that I had in spark:

From the code snippet above, all I am doing is a simple select from a table called geonames_data which is in a database called tendai_test and I just want to read only 5 records. The exception is thrown when trying to retrieve the data using the show function.

Debugging

Now, my first instinct was to check the schema of the table. Using the Dataframe’s printSchema function, I can see the fields and their datatypes as shown below:

From the schema above I can see that all the fields and their datatypes and all is what I expect. However, from the query I am using to retrieve the data, there is no place I am doing a cast. Therefore, there got to be another place I need to check to see if there is a difference.  The next place I checked was to read the schema from ORC as shown below:

From the schema above, I noticed that there is a difference in datatypes for a field called modificationdate, from ORC its string whereas from Hive its a date.

Solution

Quick solution is to do a backup of the table in Hive as follows:

create table tendai_test.geonames_data_backup as select * from tendai_test.geonames_data;

After backing up the data, drop the original table as follows:

drop table tendai_test.geonames_data purge;

Once the table is dropped, create the table using the original script and using the geonames_data table, the script is as follows:

create table tendai_test.geonames_data(
geonameid int,
name string,
asciiname string,
alternatenames string,
latitude int,
longitude int,
featureclass string,
featurecode string,
countrycode string,
cc2 string,
admin1code int,
admin2code int,
admin3code int,
admin4code int,
population int,
elevation string,
dem int,
timezone string,
modificationdate date)
STORED AS ORC;

Once that is done, populate the table using the backed up data from geonames_data_backup as follows:

insert into tendai_test.geonames_data select * from tendai_test.geonames_data_backup;

Back to the Spark code

If I run the Spark code again, I will no longer have the exception as shown below:

Just did a minor modification to the code by selecting a few fields.

Reproducing the issue

Using the backup table from the above, we can recreate the issue as follows:

Lessons

The lesson here is that, when writing data in Spark, make sure that the datatypes are not changed. Or when doing a cast, make sure that the cast is of the right datatype. Check first before doing a cast to make sure that the right thing is being done.

Doing this without checking might slow down production especially if working in a big team. Say someone is reading data from geonames_data, and the expected datatype of modificationdate is date, then all of a sudden there is now a cast exception. Trying to get to the bottom of the issue and resolving it, will slow down progress.

Conclusion

This is what I experienced and I just wanted to share. In future posts, I will talk about Hive since there are some concepts in this post that I didn’t talk about, for instance ORC and Dataframes.

Apache Spark 2

Introduction

In the article, Apache Spark 1, I looked at a high overview of what Apache Spark is and its architecture. I touched on the fact that at the core of Apache Spark there is a very advanced Direct Acyclic Graph (DAG) data processing engine. In this article I will look into DAG and how it works in Spark.

What is Direct Acyclic Graph (DAG) ?

Spark is efficient due to its advanced Directed Acyclic Graph (DAG) data processing engine. To understand how DAG works, we have to break it down and define each word.

Graph, what does it mean? Its a diagram representing a system of connections or interrelations among two or more things by a number of distinctive dots, lines, bars and so forth. In our case, which is DAG, the graph is a representation of connected nodes and they are connected by what are know as edges.

Acyclic, what does it mean?  Its an adjective that describes a graph in which there is no cycle or closed path.

Directed, what does it mean? In the context of DAG, its a direction an edge has to take and it is represented by an arrow.

From the diagram above, we have a graph with a collection of nodes denoted by the letters A to F and they are connected by edges which have specific directions, hence the arrows. If we are to follow the movement, say from A, we will never go back to A, hence they are acyclic. There is no circular movement and because of that, it enables dependency tracking, for instance, B depends on A and F on E and so forth. If we think of it as a data processing pipeline, A and G will be the source of the data and it will go through several transformation steps until it gets to F which will be the final result.

Now in the context of Spark, our nodes are RDDs and the arrows are Transformations. Below is an example of how DAG works in Spark:

This is a simple word count that will be done for a ReadMe file. Visually the process will look as follows:

The textFile is our first RDD whose element is a line and it will be transformed using flatMap to another RDD whose element is a word and then map  will transform the word to a key value pair RDD. The reduceByKey will then do the summation of the counts of each word. Spark comes with its own DAG visualisation tool. From that tool the DAG diagram will look as follows:

Now, if we look at the first DAG diagram, whereby we were using letters as nodes, we established that letter A and G will represent the source of our data. The final result will then be letter F, after it has gone through some transformation. This is the same with the above example, the ReadMe file is the source and as it moves from Stage 0 to Stage 1, the data will go through some transformation. But then one would ask, why the split between Stage 0 and 1? Why is reduceByKey in its own stage?

As I mentioned earlier, one of the advantages of DAG is the tracking of dependencies. The textFile has to be flatMapped -> mapped -> reducedByKey.  Each line from the ReadMe file will be transformed to words. Now each word does not have any dependency on the other words. However, when we now want to find the number of occurrences of each word, there is going to be need of movement of words. In Spark, this is known as shuffling and if there is need for shuffling, Spark sets that as a boundary between stages. If a task will result in shuffling, it will be placed in its own stage, hence, reduceByKey is in Stage 1. Once that is done, tasks in each stage are then bundled together and are sent to the executors.

Conclusion

In this article, I managed to explain what DAG is in Spark context and also gave an example on how it works in Spark. This is just scratching the surface since more is involved when running a Spark application. For instance, when a Spark application is submitted to Apache Spark, one needs to understand how to set Spark configurations for Spark to work efficiently. Therefore, questions like, how much driver memory should I use, how many executors or number of cores per each executor do I need, will need to be answered.

When developing a Spark application, one also need to understand what transformation is and what action is in the context of Spark. Understanding these terms will help in developing an application that will be executed efficiently.

Will try to cover these concepts in the next articles on Apache Spark.

Source
Acyclic 
DAG 
Execution plan 

 

 

Apache Spark 1

Introduction

In the article, Big Data Handled Using Apache Hadoop, I looked at what is Hadoop and how it is used to handle Big Data. In conclusion I noted that Hadoop is an aggregation of different modules whose purpose is to store and process Big Data in an efficient and secure manner. One of the modules that can be used on top of Hadoop is Apache Spark. In this article, which is first of many series to come on Spark Apache, I am going to look at a high level overview of what Apache Spark is.

What is Apache Spark ?

Spark, an open source project, is a fast, general purpose cluster computing platform that takes advantage of parallelism, by distributing processing across a cluster of nodes, in order to process data very fast. What make Spark process data fast is the fact that it does the processing in main memory of the worker nodes and in doing so it prevents the unnecessary input and output operations with the disks. According to Apache Spark documentation, Spark is capable of running programs up to 100x faster than Hadoop MapReduce in memory or 10x faster on disk.

Matei Zaharia gave birth to Spark in 2009 as a project within the AMPLab at the University of California, Berkeley. Spark was later on donated to Apache in 2013 and then got promoted as a Top-Level Apache Project in 2014. Spark is one of the most active projects managed by Apache, with more than 500 contributors from across 200 organizations responsible for code and a user base of 225 000+ members. Among the contributors are well-funded corporates such as IBM, Databricks and China’s Huawei.

Spark’s Architecture

Spark comes with a very advanced Direct Acyclic Graph (DAG) data processing engine. On top of that engine, Spark has a stack of domain specific libraries that provide different functionalities useful for different big data processing needs, as shown by the diagram below:

  • Spark SQL enables the use of SQL statements inside Spark applications.
  • Spark Streaming enables processing of live data streams.
  • Spark MLlib enables development of machine learning applications.
  • Spark GraphX enables graph processing and supports a growing library of graph algorithms.

Spark can run in:

  • a standalone mode on a single node having a supported OS.
  • a cluster mode on either Hadoop YARN or Apache Mesos.
  • the Amazon EC2 cloud as well and on Kubernetes (an open source system for automating deployment, scaling and management of containerized applications).

In a distribution application, just like Spark, there is a driver program that controls the execution and there will be one or more worker nodes. The driver program allocates the tasks to the appropriate workers. In Spark, the Spark Context is the driver program and it communicates with the appropriate cluster manager (and this can either be YARN, standalone or Mesos),  to run the tasks, as shown below:

 

Conclusion

This is just a high overview of what Apache Spark is and its high level architecture. In the next series, will take a deep dive into Apache Spark.

Source:
Apache Spark Docs
Apache Spark Architecture Explained

 

© 2024 Tendai

Theme by Tendai BepeteUp ↑