DIfferent issues that may occur in Apache spark and their remedies.

DIfferent issues that may occur in spark and their remedies :


Currently we are using m4.2xlarge


Spark jobs are performed efficiently to process the large data with the configurations discussed below,  taming the big data to get desired output with low latency.    Here we discuss the Spark configuration parameters we applied to resolve issues ,and get efficient performance in AWS to process Big data of 30 gb.


Spark on yarn environment: (set, below two properties to submit job though spark-submit.
–num-executors NUM     Number of executors to launch (Default: 2).
–executor-cores NUM     Number of cores per executor (Default: 1).
Note: These switches to be used depending upon cluster capacity.
  • Troubleshooting:
Issue 1:
Exception in thread “main” org.apache.spark.SparkException: A master URL must be configured.
Resolution:
     Spark properties are configured in three ways:
  1. Setting the configuration properties in the code using spark conf
  2. Setting the switches in spark-submit tool
  3. Keeping the configuration properties in spark-defaults.confProperties configured as above will take precedence in the same order as above, i.e. firstly as in the code , secondly as in switches and thirdly as in default.conf .
  4. To avoid configuration mismatch among the three we have chosen to code “Dynamically Loading Spark Properties”.To avoid hard-coding in a SparkConf for instance, if you would like to run the same application with different masters or different memory size, Spark allows to create an empty conf:- val sc = new SparkContext(new SparkConf())
Issue 2:
15/04/01 11:25:56 INFO scheduler.DAGScheduler: Job 12 failed: collect at CardTransformations.java:225, took 308.106770 s                             Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 193                   tasks (1028.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at                                                                                                                                       org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages                                 (DAGScheduler.scala:1214)
       Resolution:
spark.driver.maxResultSize: Limit of total size of serialized results of all partitions for each Spark action (e.g. collect). (Should be at                    least 1M, or 0 for unlimited.) Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory                        errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the                    driver from out-of-memory errors.
Increase the spark.driver.maxResultSize (Default: 1G) using “spark.driver.maxResultSize” property.
       Issue 3:
15/04/01 12:00:42 INFO scheduler.DAGScheduler: Job 13 failed: saveAsTextFile at JavaSchemaRDD.scala:42, took                                                 211.893538s  Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 12662:0               was 57503819 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) – reserved (204800 bytes). Consider                             increasing spark.akka.frameSize or using broadcast variables for large values. at org.apache.spark.scheduler.                                                                 DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 Resolution:
spark.akka.frameSize: Maximum message size to allow in “control plane” communication (for serialized tasks and task results), in MB.          Increase this if the tasks need to send back large results to the driver (e.g. using collect() on a large dataset).
Increase the spark.akka.frameSize (Default: 10M) by using ” spark.akka.frameSize” property.
Issue 4:

15/04/02 15:27:28 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception             event ([id: 0xc05241a5, /172.31.31.17:55446 => /172.31.31.15:33175] EXCEPTION: java.lang.OutOfMemoryError: Java heap space                  at java.lang.Object.clone(Native Method)  at akka.util.CompactByteString$.apply(ByteString.scala:410)  at                                                                     akka.util.ByteString$.apply(ByteString.scala:22)
Resolution:
spark.driver.memory: Amount of memory to use for the driver process, i.e. where SparkContext is initialized.
Increase the driver memory (Default: 512M) using “spark.driver.memory” property.
[sparkDriver]
java.lang.OutOfMemoryError: Java heap space
15/04/02 15:28:04 INFO scheduler.DAGScheduler: Job 0 failed: collect at LevelOneTotalReportsManager.java:99, took 895.599089 s
15/04/02 15:28:04 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception:
Job cancelled because SparkContext was shut down)
   Issue 5:

15/03/19 17:13:19 ERROR Executor: Exception in task 55.0 in stage 12.0 (TID 894)
java.lang.OutOfMemoryError: GC overhead limit exceeded
   Resolution:
spark.executor.memory : Amount of memory to use per executor process, in the same format as JVM memory strings.
Increase the executor memory (Default: 1G) using “spark.executor.memory” property.


   Issue 6:
ERROR TaskSetManager: Total size of serialized results of 8113 tasks (1131.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
ERROR TaskSetManager: Total size of serialized results of 8114 tasks (1131.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
ERROR TaskSetManager: Total size of serialized results of 8115 tasks (1131.2 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
ERROR TaskSetManager: Total size of serialized results of 8116 tasks (1131.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)


  Resolution:
  • set by SparkConf: conf.set("spark.driver.maxResultSize", "3g")
  • set by spark-defaults.conf: spark.driver.maxResultSize 3g
  • set when calling spark-submit: --conf spark.driver.maxResultSize=3g



















Some performance tips can be found here :












  Run Spark on the same nodes as HDFS. The simplest way is to set up a Spark in YARN mode.
    1. Start with minimum nodes ( i.e 1 master 2 worker nodes ) if you are not sure about the Data volume
    2. As spark run on the memory , we recommend to take up to 256GB Ram memory for each worker node
    3. Change the Execution memory for each user, as the default in Cloudera for example is less than 10Gb
    4. For low-latency data stores like HBase, it may be preferrable to run computing jobs on different nodes than the storage system to avoid interference.
    5. Please choose “Run in memory and disk” mode for the transformations in Spark.
    6. Finally, note that the Java VM does not always behave well with more than 200 GB of RAM. If you purchase machines with more RAM than this, you can run multiple worker JVMs per node.
    • Using a 10 Gigabit or higher network is the best way to make these applications faster. This is especially true for “distributed reduce” applications such as group-bys, reduce-bys, and SQL joins.
    • You should likely provision at least 8-16 cores per machine. Depending on the CPU cost of your workload, you may also need more
    • We recommend having 4-8 disks per node, configured without RAID (just as separate mount points). In Linux, mount the disks with the noatime option to reduce unnecessary writes.

Memory and CPU management in configuration :



The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:
  • 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers.
  • The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
  • 15 cores per executor can lead to bad HDFS I/O throughput.
A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?
  • This config results in three executors on all nodes except for the one with the AM, which will have two executors.
  • --executor-memory was derived as (63/3 executors per node) = 21.  21 * 0.07 = 1.47.  21 – 1.47 ~ 19.


Full details can be found  in cloudera blog here :

Also in Spark documentation we can get the details  of hardware provisioning :

About memory :
n general, Spark can run well with anywhere from 8 GB to hundreds of gigabytes of memory per machine. In all cases, we recommend allocating only at most 75% of the memory for Spark; leave the rest for the operating system and buffer cache.
How much memory you will need will depend on your application. To determine how much your application uses for a certain dataset size, load part of your dataset in a Spark RDD and use the Storage tab of Spark’s monitoring UI (http://<driver-node>:4040) to see its size in memory. Note that memory usage is greatly affected by storage level and serialization format – see the tuning guide for tips on how to reduce it.
Finally, note that the Java VM does not always behave well with more than 200 GB of RAM. If you purchase machines with more RAM than this, you can run multiple worker JVMs per node. In Spark’s standalone mode, you can set the number of workers per node with the SPARK_WORKER_INSTANCESvariable in conf/spark-env.sh, and the number of cores per worker with SPARK_WORKER_CORES.


About all other like discs and processors and Network provisioning you can find details here:
To Analyze the spark UI :


Comments

Popular posts from this blog

Steps to Install zeppelin with spark

Parquet is a column based data store or File Format (Useful for Spark read/write and SQL in order to boost performance)