CCA Spark and Hadoop Developer Exam v7.0

Page:    1 / 7   
Exam contains 96 questions

Problem Scenario 4: You have been given MySQL DB with following details. user=retail_dba password=cloudera database=retail_db table=retail_db.categories jdbc URL = jdbc:mysql://quickstart:3306/retail_db
Please accomplish following activities.
Import Single table categories (Subset data} to hive managed table , where category_id between 1 and 22



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
Step 1 : Import Single table (Subset data)
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db -username=retail_dba - password=cloudera -table=categories -where "\category_id\ between 1 and 22" --hive- import --m 1
Note: Here the ' is the same you find on ~ key
This command will create a managed table and content will be created in the following directory.
/user/hive/warehouse/categories
Step 2 : Check whether table is created or not (In Hive)
show tables;
select * from categories;

Problem Scenario 39 : You have been given two files
spark16/file1.txt
1,9,5
2,7,4
3,8,3
spark16/file2.txt
1,g,h
2,i,j
3,k,l
Load these two tiles as Spark RDD and join them to produce the below results
(l,((9,5),(g,h)))
(2, ((7,4), (i,j))) (3, ((8,3), (k,l)))
And write code snippet which will sum the second columns of above joined results (5+4+3).



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
Step 1 : Create tiles in hdfs using Hue.
Step 2 : Create pairRDD for both the files.
val one = sc.textFile("spark16/file1.txt").map{
_.split(",",-1) match {
case Array(a, b, c) => (a, ( b, c))
} }
val two = sc.textFHe(Mspark16/file2.txt").map{
_.split('7\-1) match {
case Array(a, b, c) => (a, (b, c))
} }
Step 3 : Join both the RDD. val joined = one.join(two)
Step 4 : Sum second column values.
val sum = joined.map {
case (_, ((_, num2), (_, _))) => num2.tolnt
}.reduce(_ + _)

Problem Scenario 17 : You have been given following mysql database details as well as other info. user=retail_dba password=cloudera database=retail_db jdbc URL = jdbc:mysql://quickstart:3306/retail_db
Please accomplish below assignment.
1. Create a table in hive as below, create table departments_hiveOl(department_id int, department_name string, avg_salary int);
2. Create another table in mysql using below statement CREATE TABLE IF NOT EXISTS departments_hive01(id int, department_name varchar(45), avg_salary int);
3. Copy all the data from departments table to departments_hive01 using insert into departments_hive01 select a.*, null from departments a;
Also insert following records as below
insert into departments_hive01 values(777, "Not known",1000); insert into departments_hive01 values(8888, null,1000); insert into departments_hive01 values(666, null,1100);
4. Now import data from mysql table departments_hive01 to this hive table. Please make sure that data should be visible using below hive command. Also, while importing if null value found for department_name column replace it with "" (empty string) and for id column with -999 select * from departments_hive;



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
Step 1 : Create hive table as below.
hive
show tables;
create table departments_hive01(department_id int, department_name string, avgsalary int);
Step 2 : Create table in mysql db as well.
mysql -user=retail_dba -password=cloudera
use retail_db
CREATE TABLE IF NOT EXISTS departments_hive01(id int, department_name varchar(45), avg_salary int); show tables; step 3 : Insert data in mysql table. insert into departments_hive01 select a.*, null from departments a; check data inserts select' from departments_hive01;
Now iserts null records as given in problem. insert into departments_hive01 values(777,
"Not known",1000); insert into departments_hive01 values(8888, null,1000); insert into departments_hive01 values(666, null,1100);
Step 4 : Now import data in hive as per requirement.
sqoop import \
-connect jdbc:mysql://quickstart:3306/retail_db \
~username=retail_dba \
--password=cloudera \
-table departments_hive01 \
--hive-home /user/hive/warehouse \
--hive-import \
-hive-overwrite \
-hive-table departments_hive0l \
--fields-terminated-by '\001' \
--null-string M"\
-split-by id \
-m 1
Step 5 : Checkthe data in directory.
hdfs dfs -Is /user/hive/warehouse/departments_hive01
hdfs dfs -cat/user/hive/warehouse/departments_hive01/part"
Check data in hive table.
Select * from departments_hive01;

Problem Scenario 21 : You have been given log generating service as below. startjogs (It will generate continuous logs) tailjogs (You can check , what logs are being generated) stopjogs (It will stop the log service)
Path where logs are generated using above service : /opt/gen_logs/logs/access.log
Now write a flume configuration file named flumel.conf , using that configuration file dumps logs in HDFS file system in a directory called flumel. Flume channel should have following property as well. After every 100 message it should be committed, use non-durable/faster channel and it should be able to hold maximum 1000 events
Solution :
Step 1 : Create flume configuration file, with below configuration for source, sink and channel.
#Define source , sink , channel and agent,
agent1 .sources = source1
agent1 .sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1 .sources.source1.type = exec
agent1.sources.source1.command = tail -F /opt/gen logs/logs/access.log
## Describe sinkl
agentl .sinks.sinkl.channel = memory-channel
agentl .sinks.sinkl .type = hdfs
agentl .sinks.sink1.hdfs.path = flumel
agentl .sinks.sinkl.hdfs.fileType = Data Stream
# Now we need to define channell property.
agent1.channels.channel1.type = memory
agent1.channels.channell.capacity = 1000
agent1.channels.channell.transactionCapacity = 100
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
Step 2 : Run below command which will use this configuration file and append data in hdfs.
Start log service using : startjogs
Start flume service:
flume-ng agent -conf /home/cloudera/flumeconf -conf-file
/home/cloudera/flumeconf/flumel.conf-Dflume.root.logger=DEBUG,INFO,console
Wait for few mins and than stop log service.

Stop_logs -



Answer : See the explanation for Step by Step Solution and configuration.

Problem Scenario 67 : You have been given below code snippet. lines = sc.parallelize(['lts fun to have fun,','but you have to know how.'])
M = lines.map( lambda x: x.replace(',7 ').replace('.',' 'J.replaceC-V ').lower()) r2 = r1.flatMap(lambda x: x.split()) r3 = r2.map(lambda x: (x, 1)) operation1 r5 = r4.map(lambda x:(x[1],x[0])) r6 = r5.sortByKey(ascending=False) r6.take(20)
Write a correct code snippet for operationl which will produce desired output, shown below.
[(2, 'fun'), (2, 'to'), (2, 'have'), (1, its'), (1, 'know1), (1, 'how1), (1, 'you'), (1, 'but')]



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
r4 = r3.reduceByKey(lambda x,y:x+y)

Problem Scenario 25 : You have been given below comma separated employee information. That needs to be added in /home/cloudera/flumetest/in.txt file (to do tail source) sex,name,city
1,alok,mumbai
1,jatin,chennai
1,yogesh,kolkata
2,ragini,delhi
2,jyotsana,pune
1,valmiki,banglore
Create a flume conf file using fastest non-durable channel, which write data in hive warehouse directory, in two separate tables called flumemaleemployee1 and flumefemaleemployee1
(Create hive table as well for given data}. Please use tail source with
/home/cloudera/flumetest/in.txt file.
Flumemaleemployee1 : will contain only male employees data flumefemaleemployee1 :
Will contain only woman employees data



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
Step 1 : Create hive table for flumemaleemployeel and .'

CREATE TABLE flumemaleemployeel -
(
sex_type int, name string, city string )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
CREATE TABLE flumefemaleemployeel
(
sex_type int, name string, city string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
Step 2 : Create below directory and file mkdir /home/cloudera/flumetest/ cd
/home/cloudera/flumetest/
Step 3 : Create flume configuration file, with below configuration for source, sink and channel and save it in flume5.conf. agent.sources = tailsrc agent.channels = mem1 mem2 agent.sinks = stdl std2 agent.sources.tailsrc.type = exec agent.sources.tailsrc.command = tail -F /home/cloudera/flumetest/in.txt agent.sources.tailsrc.batchSize = 1 agent.sources.tailsrc.interceptors = i1 agent.sources.tailsrc.interceptors.i1.type = regex_extractor agent.sources.tailsrc.interceptors.il.regex = A(\\d} agent.sources.tailsrc. interceptors. M.serializers = t1 agent.sources.tailsrc. interceptors, i1.serializers.t1. name = type agent.sources.tailsrc.selector.type = multiplexing agent.sources.tailsrc.selector.header = type agent.sources.tailsrc.selector.mapping.1 = memi agent.sources.tailsrc.selector.mapping.2 = mem2 agent.sinks.std1.type = hdfs agent.sinks.stdl.channel = mem1 agent.sinks.stdl.batchSize = 1 agent.sinks.std1.hdfs.path = /user/hive/warehouse/flumemaleemployeei agent.sinks.stdl.rolllnterval = 0 agent.sinks.stdl.hdfs.tileType = Data Stream agent.sinks.std2.type = hdfs agent.sinks.std2.channel = mem2 agent.sinks.std2.batchSize = 1 agent.sinks.std2.hdfs.path = /user/hi ve/warehouse/fIumefemaleemployee1 agent.sinks.std2.rolllnterval = 0 agent.sinks.std2.hdfs.tileType = Data Stream agent.channels.mem1.type = memory agent.channels.meml.capacity = 100 agent.channels.mem2.type = memory agent.channels.mem2.capacity = 100 agent.sources.tailsrc.channels = mem1 mem2
Step 4 : Run below command which will use this configuration file and append data in hdfs.
Start flume service:
flume-ng agent -conf /home/cloudera/flumeconf -conf-file
/home/cloudera/fIumeconf/flume5.conf --name agent
Step 5 : Open another terminal create a file at /home/cloudera/flumetest/in.txt.
Step 6 : Enter below data in file and save it.
l.alok.mumbai
1 jatin.chennai
1,yogesh,kolkata
2,ragini,delhi
2,jyotsana,pune
1,valmiki,banglore
Step 7 : Open hue and check the data is available in hive table or not.
Step 8 : Stop flume service by pressing ctrl+c

Problem Scenario 37 : ABCTECH.com has done survey on their Exam Products feedback using a web based form. With the following free text field as input in web ui.

Name: String -

Subscription Date: String -

Rating : String -
And servey data has been saved in a file called spark9/feedback.txt

Christopher|Jan 11, 2015|5 -

Kapil|11 Jan, 2015|5 -

Thomas|6/17/2014|5 -

John|22-08-2013|5 -

Mithun|2013|5 -

Jitendra||5 -
Write a spark program using regular expression which will filter all the valid dates and save in two separate file (good record and bad record)



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
Step 1 : Create a file first using Hue in hdfs.
Step 2 : Write all valid regular expressions sysntex for checking whether records are having valid dates or not. val regl =......(\d+)\s(\w{3})(,)\s(\d{4}).......r//11 Jan, 2015
6/17/2014
val reg3 =......(\d+)(-)(\d+)(-)(\d{4})""".r//22-08-2013

Jan 11, 2015 -
Step 3 : Load the file as an RDD.
val feedbackRDD = sc.textFile("spark9/feedback.txt"}
Step 4 : As data are pipe separated , hence split the same. val feedbackSplit = feedbackRDD.map(line => line.split('|'))
Step 5 : Now get the valid records as well as , bad records.
val validRecords = feedbackSplit.filter(x =>
(reg1.pattern.matcher(x(1).trim).matches|reg2.pattern.matcher(x(1).trim).matches|reg3.patt ern.matcher(x(1).trim).matches | reg4.pattern.matcher(x(1).trim).matches)) val badRecords = feedbackSplit.filter(x =>
!(reg1.pattern.matcher(x(1).trim).matches|reg2.pattern.matcher(x(1).trim).matches|reg3.pat tern.matcher(x(1).trim).matches | reg4.pattern.matcher(x(1).trim).matches))
Step 6 : Now convert each Array to Strings
val valid =vatidRecords.map(e => (e(0),e(1),e(2)))
val bad =badRecords.map(e => (e(0),e(1),e(2)))
Step 7 : Save the output as a Text file and output must be written in a single tile, valid.repartition(1).saveAsTextFile("spark9/good.txt") bad.repartition(1).saveAsTextFile("sparkS7bad.txt")

Problem Scenario GG : You have been given below code snippet. val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) val c = sc.parallelize(List("ant", "falcon", "squid"), 2) val d = c.keyBy(.length) operation 1
Write a correct code snippet for operationl which will produce desired output, shown below.
Array[(lnt, String)] = Array((4,lion))



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
b.subtractByKey(d).collect
subtractByKey [Pair] : Very similar to subtract, but instead of supplying a function, the key- component of each pair will be automatically used as criterion for removing items from the first RDD.

Problem Scenario 22 : You have been given below comma separated employee information. name,salary,sex,age alok,100000,male,29 jatin,105000,male,32 yogesh,134000,male,39 ragini,112000,female,35 jyotsana,129000,female,39 valmiki,123000,male,29
Use the netcat service on port 44444, and nc above data line by line. Please do the following activities.
1. Create a flume conf file using fastest channel, which write data in hive warehouse directory, in a table called flumeemployee (Create hive table as well tor given data).
2. Write a hive query to read average salary of all employees.



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
Step 1 : Create hive table forflumeemployee.'

CREATE TABLE flumeemployee -
(
name string, salary int, sex string,
age int
)

ROW FORMAT DELIMITED -
FIELDS TERMINATED BY ',';
Step 2 : Create flume configuration file, with below configuration for source, sink and channel and save it in flume2.conf.
#Define source , sink , channel and agent,
agent1 .sources = source1
agent1 .sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = 127.0.0.1
agent1.sources.source1.port = 44444
## Describe sink1
agent1 .sinks.sink1.channel = memory-channel
agent1.sinks.sink1.type = hdfs
agent1 .sinks.sink1.hdfs.path = /user/hive/warehouse/flumeemployee hdfs-agent.sinks.hdfs-write.hdfs.writeFormat=Text agent1 .sinks.sink1.hdfs.tileType = Data Stream
# Now we need to define channel1 property.
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapacity = 100
# Bind the source and sink to the channel
Agent1 .sources.sourcel.channels = channell agent1 .sinks.sinkl.channel = channel1
Step 3 : Run below command which will use this configuration file and append data in hdfs.
Start flume service:
flume-ng agent -conf /home/cloudera/flumeconf -conf-file
/home/cloudera/flumeconf/flume2.conf --name agent1
Step 4 : Open another terminal and use the netcat service.
nc localhost 44444
Step 5 : Enter data line by line.
alok,100000.male,29
jatin,105000,male,32
yogesh,134000,male,39
ragini,112000,female,35
jyotsana,129000,female,39
valmiki,123000,male,29
Step 6 : Open hue and check the data is available in hive table or not. step 7 : Stop flume service by pressing ctrl+c
Step 8 : Calculate average salary on hive table using below query. You can use either hive command line tool or hue. select avg(salary) from flumeemployee;

Problem Scenario 8 : You have been given following mysql database details as well as other info.
Please accomplish following.
1. Import joined result of orders and order_items table join on orders.order_id = order_items.order_item_order_id.
2. Also make sure each tables file is partitioned in 2 files e.g. part-00000, part-00002
3. Also make sure you use orderid columns for sqoop to use for boundary conditions.



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solutions:
Step 1 : Clean the hdfs file system, if they exists clean out. hadoop fs -rm -R departments hadoop fs -rm -R categories hadoop fs -rm -R products hadoop fs -rm -R orders hadoop fs -rm -R order_items hadoop fs -rm -R customers
Step 2 : Now import the department table as per requirement.
sqoop import \
--connect jdbc:mysql://quickstart:3306/retail_db \
-username=retail_dba \
-password=cloudera \
-query="select' from orders join order_items on orders.orderid = order_items.order_item_order_id where \SCONDITlONS" \
-target-dir /user/cloudera/order_join \
-split-by order_id \
--num-mappers 2
Step 3 : Check imported data.
hdfs dfs -Is order_join
hdfs dfs -cat order_join/part-m-00000
hdfs dfs -cat order_join/part-m-00001

Problem Scenario 70 : Write down a Spark Application using Python, In which it read a file "Content.txt" (On hdfs) with following content. Do the word count and save the results in a directory called "problem85" (On hdfs)

Content.txt -

Hello this is ABCTECH.com -

This is XYZTECH.com -

Apache Spark Training -

This is Spark Learning Session -

Spark is faster than MapReduce -



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
Step 1 : Create an application with following code and store it in problem84.py
# Import SparkContext and SparkConf
from pyspark import SparkContext, SparkConf
# Create configuration object and set App name
conf = SparkConf().setAppName("CCA 175 Problem 85") sc = sparkContext(conf=conf)
#load data from hdfs
contentRDD = sc.textFile(MContent.txt")
#filter out non-empty lines
nonemptyjines = contentRDD.filter(lambda x: len(x) > 0)
#Split line based on space
words = nonempty_lines.ffatMap(lambda x: x.split(''}}
#Do the word count
wordcounts = words.map(lambda x: (x, 1)) \
reduceByKey(lambda x, y: x+y) \
map(lambda x: (x[1], x[0]}}.sortByKey(False}
for word in wordcounts.collect(): print(word)
#Save final data " wordcounts.saveAsTextFile("problem85")
step 2 : Submit this application
spark-submit -master yarn problem85.py

Problem Scenario 85 : In Continuation of previous question, please accomplish following activities.
1. Select all the columns from product table with output header as below. productID AS ID code AS Code name AS Description price AS 'Unit Price'
2. Select code and name both separated by ' -' and header name should be Product
Description'.
3. Select all distinct prices.
4. Select distinct price and name combination.
5. Select all price data sorted by both code and productID combination.
6. count number of products.
7. Count number of products for each code.



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
Step 1 : Select all the columns from product table with output header as below. productID
AS ID code AS Code name AS Description price AS "Unit Price'
val results = sqlContext.sql(......SELECT productID AS ID, code AS Code, name AS
Description, price AS Unit Price' FROM products ORDER BY ID""" results.show()
Step 2 : Select code and name both separated by ' -' and header name should be "Product
Description.
val results = sqlContext.sql(......SELECT CONCAT(code,' -', name) AS Product Description, price FROM products""" ) results.showQ
Step 3 : Select all distinct prices.
val results = sqlContext.sql(......SELECT DISTINCT price AS Distinct Price" FROM products......) results.show()
Step 4 : Select distinct price and name combination.
val results = sqlContext.sql(......SELECT DISTINCT price, name FROM products""" ) results. showQ
Step 5 : Select all price data sorted by both code and productID combination. val results = sqlContext.sql('.....SELECT' FROM products ORDER BY code, productID'.....) results.show()
Step 6 : count number of products.
val results = sqlContext.sql(......SELECT COUNT(') AS 'Count' FROM products......) results.show()
Step 7 : Count number of products for each code.
val results = sqlContext.sql(......SELECT code, COUNT('} FROM products GROUP BY code......) results. showQ val results = sqlContext.sql(......SELECT code, COUNT('} AS count FROM products
GROUP BY code ORDER BY count DESC......)
results. showQ

Problem Scenario 77 : You have been given MySQL DB with following details. user=retail_dba password=cloudera database=retail_db table=retail_db.orders table=retail_db.order_items jdbc URL = jdbc:mysql://quickstart:3306/retail_db
Columns of order table : (orderid , order_date , order_customer_id, order_status)
Columns of ordeMtems table : (order_item_id , order_item_order_ld , order_item_product_id, order_item_quantity,order_item_subtotal,order_ item_product_price)
Please accomplish following activities.
1. Copy "retail_db.orders" and "retail_db.order_items" table to hdfs in respective directory p92_orders and p92 order items .
2. Join these data using orderid in Spark and Python
3. Calculate total revenue perday and per order
4. Calculate total and average revenue for each date. - combineByKey
-aggregateByKey



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
Step 1 : Import Single table .
sqoop import --connect jdbc:mysql://quickstart:3306/retail_db -username=retail_dba - password=cloudera -table=orders --target-dir=p92_orders m 1 sqoop import --connect jdbc:mysql://quickstart:3306/retail_db --username=retail_dba - password=cloudera -table=order_items --target-dir=p92_order_items m1
Note : Please check you dont have space between before or after '=' sign. Sqoop uses the
MapReduce framework to copy data from RDBMS to hdfs
Step 2 : Read the data from one of the partition, created using above command, hadoop fs
-cat p92_orders/part-m-00000 hadoop fs -cat p92_order_items/part-m-00000
Step 3 : Load these above two directory as RDD using Spark and Python (Open pyspark terminal and do following). orders = sc.textFile("p92_orders") orderltems = sc.textFile("p92_order_items")
Step 4 : Convert RDD into key value as (orderjd as a key and rest of the values as a value)
#First value is orderjd
ordersKeyValue = orders.map(lambda line: (int(line.split(",")[0]), line))
#Second value as an Orderjd
orderltemsKeyValue = orderltems.map(lambda line: (int(line.split(",")[1]), line))
Step 5 : Join both the RDD using orderjd
joinedData = orderltemsKeyValue.join(ordersKeyValue)
#print the joined data
for line in joinedData.collect():
print(line)
Format of joinedData as below.
[Orderld, 'All columns from orderltemsKeyValue', 'All columns from orders Key Value']
Step 6 : Now fetch selected values Orderld, Order date and amount collected on this order.
//Retruned row will contain ((order_date,order_id),amout_collected) revenuePerDayPerOrder = joinedData.map(lambda row: ((row[1][1].split(M,M)[1],row[0]}, float(row[1][0].split(",")[4])))
#print the result
for line in revenuePerDayPerOrder.collect():
print(line)
Step 7 : Now calculate total revenue perday and per order

A. Using reduceByKey -
totalRevenuePerDayPerOrder = revenuePerDayPerOrder.reduceByKey(lambda runningSum, value: runningSum + value) for line in totalRevenuePerDayPerOrder.sortByKey().collect(): print(line)
#Generate data as (date, amount_collected) (Ignore ordeMd)
dateAndRevenueTuple = totalRevenuePerDayPerOrder.map(lambda line: (line[0][0], line[1])) for line in dateAndRevenueTuple.sortByKey().collect(): print(line)
Step 8 : Calculate total amount collected for each day. And also calculate number of days.
#Generate output as (Date, Total Revenue for date, total_number_of_dates)
#Line 1 : it will generate tuple (revenue, 1)
#Line 2 : Here, we will do summation for all revenues at the same time another counter to maintain number of records.
#Line 3 : Final function to merge all the combiner
totalRevenueAndTotalCount = dateAndRevenueTuple.combineByKey( \ lambda revenue: (revenue, 1), \ lambda revenueSumTuple, amount: (revenueSumTuple[0] + amount, revenueSumTuple[1]
+ 1), \
lambda tuplel, tuple2: (round(tuple1[0] + tuple2[0], 2}, tuple1[1] + tuple2[1]) \ for line in totalRevenueAndTotalCount.collect(): print(line)
Step 9 : Now calculate average fo

Problem Scenario 49 : You have been given below code snippet (do a sum of values by key}, with intermediate output. val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C",
"bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesl_ist}
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(l))).cache() val initialCount = 0; val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
Now define two functions (addToCounts, sumPartitionCounts) such, which will produce following results.

Output 1 -
countByKey.collect
res3: Array[(String, Int)] = Array((foo,5), (bar,3))
import scala.collection._
val initialSet = scala.collection.mutable.HashSet.empty[String] val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
Now define two functions (addToSet, mergePartitionSets) such, which will produce following results.
Output 2:
uniqueByKey.collect
res4: Array[(String, scala.collection.mutable.HashSet[String])] = Array((foo,Set(B, A}},
(bar,Set(C, D}}}



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:
Solution :
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int} => p1 + p2
val addToSet = (s: mutable.HashSet[String], v: String) => s += v val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1
++= p2

Problem Scenario 93 : You have to run your Spark application with locally 8 thread or locally on 8 cores. Replace XXX with correct values. spark-submit --class com.hadoopexam.MyTask XXX \ -deploy-mode cluster
SSPARK_HOME/lib/hadoopexam.jar 10



Answer : See the explanation for Step by Step Solution and configuration.

Explanation:

Solution -
XXX: -master local[8]
Notes : The master URL passed to Spark can be in one of the following formats:

Master URL Meaning -
local Run Spark locally with one worker thread (i.e. no parallelism at all}. local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). local[*] Run Spark locally with as many worker threads as logical cores on your machine. spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. mesos://HOST:PORT Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using
ZooKeeper, use mesos://zk://.... To submit with --deploy-mode cluster, the HOST:PORT should be configured to connect to the MesosClusterDispatcher. yarn Connect to a YARN cluster in client or cluster mode depending on the value of - deploy-mode. The cluster location will be found based on the HADOOP CONF DIR or
YARN CONF DIR variable.

Page:    1 / 7   
Exam contains 96 questions

Talk to us!


Have any questions or issues ? Please dont hesitate to contact us

Certlibrary doesn't offer Real Microsoft Exam Questions.
Certlibrary Materials do not contain actual questions and answers from Cisco's Certification Exams.
CFA Institute does not endorse, promote or warrant the accuracy or quality of Certlibrary. CFA® and Chartered Financial Analyst® are registered trademarks owned by CFA Institute.