Tendai

On anything that got to do with software development

Category: Big Data

Cannot cast exception while reading data from Hive in Spark

Introduction

In this post I am going to talk about an exception I got when I was trying to read data from Hive using Spark and how I managed to debug the issue and resolved it. I will also explain how one can reproduce the issue, by doing so, one can also avoid reproducing it. The exception was:

java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.hive.serde2.io.DateWritable

Spark code

Here is the snippet of the code that I had in spark:

From the code snippet above, all I am doing is a simple select from a table called geonames_data which is in a database called tendai_test and I just want to read only 5 records. The exception is thrown when trying to retrieve the data using the show function.

Debugging

Now, my first instinct was to check the schema of the table. Using the Dataframe’s printSchema function, I can see the fields and their datatypes as shown below:

From the schema above I can see that all the fields and their datatypes and all is what I expect. However, from the query I am using to retrieve the data, there is no place I am doing a cast. Therefore, there got to be another place I need to check to see if there is a difference.  The next place I checked was to read the schema from ORC as shown below:

From the schema above, I noticed that there is a difference in datatypes for a field called modificationdate, from ORC its string whereas from Hive its a date.

Solution

Quick solution is to do a backup of the table in Hive as follows:

create table tendai_test.geonames_data_backup as select * from tendai_test.geonames_data;

After backing up the data, drop the original table as follows:

drop table tendai_test.geonames_data purge;

Once the table is dropped, create the table using the original script and using the geonames_data table, the script is as follows:

create table tendai_test.geonames_data(
geonameid int,
name string,
asciiname string,
alternatenames string,
latitude int,
longitude int,
featureclass string,
featurecode string,
countrycode string,
cc2 string,
admin1code int,
admin2code int,
admin3code int,
admin4code int,
population int,
elevation string,
dem int,
timezone string,
modificationdate date)
STORED AS ORC;

Once that is done, populate the table using the backed up data from geonames_data_backup as follows:

insert into tendai_test.geonames_data select * from tendai_test.geonames_data_backup;

Back to the Spark code

If I run the Spark code again, I will no longer have the exception as shown below:

Just did a minor modification to the code by selecting a few fields.

Reproducing the issue

Using the backup table from the above, we can recreate the issue as follows:

Lessons

The lesson here is that, when writing data in Spark, make sure that the datatypes are not changed. Or when doing a cast, make sure that the cast is of the right datatype. Check first before doing a cast to make sure that the right thing is being done.

Doing this without checking might slow down production especially if working in a big team. Say someone is reading data from geonames_data, and the expected datatype of modificationdate is date, then all of a sudden there is now a cast exception. Trying to get to the bottom of the issue and resolving it, will slow down progress.

Conclusion

This is what I experienced and I just wanted to share. In future posts, I will talk about Hive since there are some concepts in this post that I didn’t talk about, for instance ORC and Dataframes.

Apache Spark 2

Introduction

In the article, Apache Spark 1, I looked at a high overview of what Apache Spark is and its architecture. I touched on the fact that at the core of Apache Spark there is a very advanced Direct Acyclic Graph (DAG) data processing engine. In this article I will look into DAG and how it works in Spark.

What is Direct Acyclic Graph (DAG) ?

Spark is efficient due to its advanced Directed Acyclic Graph (DAG) data processing engine. To understand how DAG works, we have to break it down and define each word.

Graph, what does it mean? Its a diagram representing a system of connections or interrelations among two or more things by a number of distinctive dots, lines, bars and so forth. In our case, which is DAG, the graph is a representation of connected nodes and they are connected by what are know as edges.

Acyclic, what does it mean?  Its an adjective that describes a graph in which there is no cycle or closed path.

Directed, what does it mean? In the context of DAG, its a direction an edge has to take and it is represented by an arrow.

From the diagram above, we have a graph with a collection of nodes denoted by the letters A to F and they are connected by edges which have specific directions, hence the arrows. If we are to follow the movement, say from A, we will never go back to A, hence they are acyclic. There is no circular movement and because of that, it enables dependency tracking, for instance, B depends on A and F on E and so forth. If we think of it as a data processing pipeline, A and G will be the source of the data and it will go through several transformation steps until it gets to F which will be the final result.

Now in the context of Spark, our nodes are RDDs and the arrows are Transformations. Below is an example of how DAG works in Spark:

This is a simple word count that will be done for a ReadMe file. Visually the process will look as follows:

The textFile is our first RDD whose element is a line and it will be transformed using flatMap to another RDD whose element is a word and then map  will transform the word to a key value pair RDD. The reduceByKey will then do the summation of the counts of each word. Spark comes with its own DAG visualisation tool. From that tool the DAG diagram will look as follows:

Now, if we look at the first DAG diagram, whereby we were using letters as nodes, we established that letter A and G will represent the source of our data. The final result will then be letter F, after it has gone through some transformation. This is the same with the above example, the ReadMe file is the source and as it moves from Stage 0 to Stage 1, the data will go through some transformation. But then one would ask, why the split between Stage 0 and 1? Why is reduceByKey in its own stage?

As I mentioned earlier, one of the advantages of DAG is the tracking of dependencies. The textFile has to be flatMapped -> mapped -> reducedByKey.  Each line from the ReadMe file will be transformed to words. Now each word does not have any dependency on the other words. However, when we now want to find the number of occurrences of each word, there is going to be need of movement of words. In Spark, this is known as shuffling and if there is need for shuffling, Spark sets that as a boundary between stages. If a task will result in shuffling, it will be placed in its own stage, hence, reduceByKey is in Stage 1. Once that is done, tasks in each stage are then bundled together and are sent to the executors.

Conclusion

In this article, I managed to explain what DAG is in Spark context and also gave an example on how it works in Spark. This is just scratching the surface since more is involved when running a Spark application. For instance, when a Spark application is submitted to Apache Spark, one needs to understand how to set Spark configurations for Spark to work efficiently. Therefore, questions like, how much driver memory should I use, how many executors or number of cores per each executor do I need, will need to be answered.

When developing a Spark application, one also need to understand what transformation is and what action is in the context of Spark. Understanding these terms will help in developing an application that will be executed efficiently.

Will try to cover these concepts in the next articles on Apache Spark.

Source
Acyclic 
DAG 
Execution plan 

 

 

Apache Spark 1

Introduction

In the article, Big Data Handled Using Apache Hadoop, I looked at what is Hadoop and how it is used to handle Big Data. In conclusion I noted that Hadoop is an aggregation of different modules whose purpose is to store and process Big Data in an efficient and secure manner. One of the modules that can be used on top of Hadoop is Apache Spark. In this article, which is first of many series to come on Spark Apache, I am going to look at a high level overview of what Apache Spark is.

What is Apache Spark ?

Spark, an open source project, is a fast, general purpose cluster computing platform that takes advantage of parallelism, by distributing processing across a cluster of nodes, in order to process data very fast. What make Spark process data fast is the fact that it does the processing in main memory of the worker nodes and in doing so it prevents the unnecessary input and output operations with the disks. According to Apache Spark documentation, Spark is capable of running programs up to 100x faster than Hadoop MapReduce in memory or 10x faster on disk.

Matei Zaharia gave birth to Spark in 2009 as a project within the AMPLab at the University of California, Berkeley. Spark was later on donated to Apache in 2013 and then got promoted as a Top-Level Apache Project in 2014. Spark is one of the most active projects managed by Apache, with more than 500 contributors from across 200 organizations responsible for code and a user base of 225 000+ members. Among the contributors are well-funded corporates such as IBM, Databricks and China’s Huawei.

Spark’s Architecture

Spark comes with a very advanced Direct Acyclic Graph (DAG) data processing engine. On top of that engine, Spark has a stack of domain specific libraries that provide different functionalities useful for different big data processing needs, as shown by the diagram below:

  • Spark SQL enables the use of SQL statements inside Spark applications.
  • Spark Streaming enables processing of live data streams.
  • Spark MLlib enables development of machine learning applications.
  • Spark GraphX enables graph processing and supports a growing library of graph algorithms.

Spark can run in:

  • a standalone mode on a single node having a supported OS.
  • a cluster mode on either Hadoop YARN or Apache Mesos.
  • the Amazon EC2 cloud as well and on Kubernetes (an open source system for automating deployment, scaling and management of containerized applications).

In a distribution application, just like Spark, there is a driver program that controls the execution and there will be one or more worker nodes. The driver program allocates the tasks to the appropriate workers. In Spark, the Spark Context is the driver program and it communicates with the appropriate cluster manager (and this can either be YARN, standalone or Mesos),  to run the tasks, as shown below:

 

Conclusion

This is just a high overview of what Apache Spark is and its high level architecture. In the next series, will take a deep dive into Apache Spark.

Source:
Apache Spark Docs
Apache Spark Architecture Explained

 

Big Data Handled Using Apache Hadoop

Introduction

In the article, Big Data, what does it mean?,  I spoke about what Big Data is and its characteristics. Upon conclusion, we noticed that there are technologies that are used to handle Big Data and that one of those technologies is Hadoop. In this article we will look at what Hadoop is used and a high overview of how it is used to handle Big Data based on the research I did online and what I understood.

What is Hadoop?

In order to understand what Hadoop is, one needs to understand the problem Hadoop is trying to address, and that problem is Big Data. We now know that Big Data is characterized by the 3 Vs, volume, velocity and variety. There is need to store huge amounts of data, in its different forms, and that cannot be done on a single computer. There is also need to process that huge amount of data and again it cannot be handled by a single computer using the traditional software we already have.

Lets use a hypothetical scenario in order to understand what Hadoop is. There is a bumper harvest on a farm and there is need for storage of the harvest, say wheat. Now a single silo will not be able to store all the wheat and there is need for more than one silo, say 24 is needed. All the wheat is harvested and stored nicely in the silos. Now the wheat needs to be processed so that the farmer can produce flour for baking. To process one silo takes about, say 24 hours, that is one day. For all silos, that will take 24 days and the farmer’s clients are already waiting for the flour to be supplied and they cannot wait for 24 days. The farmer then employs a more advanced machinery that can process more wheat and takes less time, say for one silo it now takes 4 hrs. If my math is right, one machine will take 4 days to process all the silos, but this needs to be done in one day, so the farmer buys 3 more machines. All is good, the wheat is processed in a day and the following day all the flour has been packaged and is ready for delivery. All this is summarized below:

Wheat can be thought of as data that needs to be stored and one silo will not do. The same with data, one computer will not be sufficient and hence we need a number of computers. A collection of computers whose purpose is to store data and process it is known as a cluster and each computer on that cluster is known as a node. The wheat can be thought of as being distributed to each silo. This is the same with data, it is distributed among the computers and in Hadoop it is achieved using its distributed flies system known as Hadoop Distributed File System, HDFS in short. The wheat again is processed in parallel so that flour can be produced in a day. This is the same with data, it can be processed in parallel so that whatever computation is done can produce results faster. In Hadoop this is done using a MapReduce programming model. For the machines to operate smoothly, the farmer needs to overlook at the whole process and in Hadoop that is done using YARN short for Yet Another Resource Negotiator.

Hadoop, an Apache open-source project, is therefore a combination of modules whose purpose is to store and process huge amounts of data. At the core of Hadoop, there is HDFS, responsible for storing the data and MapReduce, responsible for processing the data. In addition to the core there is Hadoop YARN and Hadoop Common. The modules are summarized as follows:

Hadoop Distributed File System (HDFS) is a distributed file system that stores data on commodity machines, providing very high aggregate bandwidth across the cluster.

Hadoop MapReduce is an implementation of the MapReduce programming model for large scale data processing.

Hadoop YARN is a platform responsible for managing computing resources in a cluster and using them for scheduling applications.

Hadoop Common contains libraries and utilities needed by other Hadoop modules.

Back to our farm analogy, if the farmer wants to keep say chickens for poultry farming, the farmer will build a poultry cage on the very same land where wheat is grown and kept. The farmer can do more than one activity on the farm, the limit here is the size of land and talent. Same thing with Hadoop. It is not limited to the aforementioned modules, but other modules can be added onto the platform such as Hive, HBase, Zookeeper, Kafka, Storm, Spark and so forth. All these modules perform different functions.

How it began

The co-founders of Hadoop are Doug Cutting and Mike Cafarella. The name Hadoop, came from Doug’s son’s toy elephant.  Doug and Mike were inspired by the “Google File System” paper that was published in October 2003. The initial development was on the Apache Nutch project but later on moved to the new Hadoop project in January 2006. The first committer to add to the Hadoop project was Owen O’Malley in March 2006 and Hadoop 0.1.0 was released in April 2006. It continues to evolve through the many contributions that are being made to the project.

Conclusion

This was just a high overview of what Hadoop is and how it handles Big Data. From this, in short, we can say Hadoop is an aggregation of different modules wholes purpose is to store and process Big Data in an efficient and secure manner. In future articles, I will look into the core modules of Hadoop in depth.

Source:
Wiki Apache Hadoop

Big Data, what does it mean?

Introduction

The buzz these days is Big Data. From government institutions to private entities, they are all talking about Big Data. What is Big Data? In this article, I will try to explain what Big Data is based on the research and understanding I did.

Big Data, what is it?

Before answering this question, the first question we must ask ourselves is, what is data? Dictionary definition of data states that data is facts and statistics collected together for reference or analysis. If you think about it, we have been collecting data for a very long time before the dawn of computers, using files and books. Now with the use of computers we moved from capturing facts or statistics on paper to using spreadsheets and databases on computers. Once the information has been captured on computers it is now in binary digital form.

Now let us look at the definition of big. The dictionary defines big as something of considerable size or extent. Some words that are synonyms to big include, large, great, huge, immense, enormous and so forth. Combining the two, it follows that we can say Big Data is an enormous collection of facts and statistics collected together for reference or analysis. Is it that simple? Not even close. To understand why, we need a small history. Remember we shifted from paper to computers for capturing data and guess what, the first hard drive, which is a storage device on a computer for storing data, only captured 5 MB of data. This is equivalent to Shakespeare’s complete work or a 30 second video clip of broadcast quality.

Today, a standard computer can have a hard drive that can hold up to 15.36 terabytes (TB) of data. If 10 terabytes of data is equivalent to the printed collection of the U.S Library of Congress, 15.36 terabytes, now that’s a lot of data on a single computer. I am sure we can start to see why Big Data is in hype these days. In terms of volume, we can now have the capability of holding as much information as we can using computers. But is that all? I mean is Big Data about the amount of data we can hold? Not even close.

We are no longer capturing data using computers only, we now have smart devices such as our phones,  refrigerators,  airplanes and even motor vehicles. All these devices have the capability of capturing data. Now imagine people on a platform like Twitter and they are tweeting about something that has just happened using their phones and people at work are also on the same band wagon, posting about the same thing. Now all of a sudden there is a lot of information coming through Twitter and say for every two seconds, a hundred people are tweeting. At this point, Twitter is not only experiencing a surge of data but the speed or velocity that the data is coming through is also high. People are not only posting using text but they are also using pictures and videos. Now not only surge and speed but data is also coming in different varieties.

At this point we can see that Big Data is not only about the volume of data but it is also about velocity and variety. In Big Data world, these characteristics are known as the 3 Vs. Lets look at the each of the Vs.

Volume

Volume is one of the main characteristics of Big Data because of the meaning of the word volume itself, which means, the amount of space that a substance or object occupies or that is enclosed within a container. In this case we can rephrase it to the amount of data that is occupied on a hard drive. For instance, think about the fact that Twitter has more active users than South Africa has people. Each of those users have posted a whole lot of photographs, tweets and videos. What about platforms such as Instagram, they have reported that on an average day 80 million photos are shared and what about Facebook, it is reported that it stores about 250 billion images.

Lets try to quantify this data. Say you have a phone that has a resolution of 1440 x 2560 pixels, multiply 1440 by 2560 which gives you 3,686,400 pixels. You then multiply this number by the number of bytes per pixel:
16 bit per pixel image: 3,686,400 X 2 bytes per pixel = 7372800 bytes = 7.37 MB approx
32 bit per pixel image: 3,686,400 X 4 bytes per pixel = 14745600 bytes = 14.75 MB approx

Now for argument’s sake, if on average day 80 million photos are shared on Instagram, how much space is need if say all users as using a phone with the above resolution? If my Maths is right, that is about 1,180,000,000 MB which is 1.18 Petabyte, take note here, per DAY!! Now, that’s a lot of hardware equipment that is need to store that much of information.

Velocity

Velocity is also an important characteristic of Big Data. Why? Again, from the meaning of the word velocity, which is, the speed of something in a given direction. In this case, the speed at which data is flowing to a data center. Back to our Twitter analogy, every second, on average, around 6,000 tweets are tweeted, which corresponds to over 350,000 tweets sent per minute, 500 million tweets per day and around 200 billion tweets per year. That is a lot of information that needs to be handled so that there are no bottlenecks. If a user logs on Twitter, the experience must be flawless.

Variety 

Variety is the quality or state of being different or diverse; the absence of uniformity or monotony. In others words, not the same and this is true for data as well. From the examples we used, people tweet using text, photos and videos. The data here is different. In other words data can be structured and unstructured,  which means that not all data can easily fit into fields on a spreadsheet or a database application. There got to be ways and means of storing data in its different form.

Conclusion 

Big Data is not only how huge or enormous the data is but its also about how fast the data is moving and its type. If there is a surge in the amount of data that needs to processed, the next question we need to answer is, how is it all handled? When a user logs on Twitter, Facebook, Instagram or on a news site like BBC or streaming on YouTube, the experience is flawless, but the amount that is coming through those platforms is huge and moving fast and in different form. Due to this, it gave birth to various technologies that are being used to manage the surge.  These technologies include, Hadoop, Spark, Hive, Kubernetes just to mention a few. Will look into some of these technologies in future articles.

Source
How much is data
First hard drive
Twitter stats
Images measure

© 2024 Tendai

Theme by Tendai BepeteUp ↑