Answered step by step
Verified Expert Solution
Link Copied!

Question

1 Approved Answer

sprk LowestRatedMovieSpark.py from pyspark import SparkConf, SparkContext # This function just creates a Python dictionary we can later # use to convert movie ID's to

sprk

LowestRatedMovieSpark.py

from pyspark import SparkConf, SparkContext

# This function just creates a Python "dictionary" we can later # use to convert movie ID's to movie names while printing out # the final results. def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1] return movieNames

# Take each line of u.data and convert it to (movieID, (rating, 1.0)) # This way we can then add up all the ratings for each movie, and # the total number of ratings for each movie (which lets us compute the average) def parseInput(line): fields = line.split() return (int(fields[1]), (float(fields[2]), 1.0))

if __name__ == "__main__": # The main script - create our SparkContext conf = SparkConf().setAppName("WorstMovies") sc = SparkContext(conf = conf)

# Load up our movie ID -> movie name lookup table movieNames = loadMovieNames()

# Load up the raw u.data file lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")

# Convert to (movieID, (rating, 1.0)) movieRatings = lines.map(parseInput)

# Reduce to (movieID, (sumOfRatings, totalRatings)) ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )

# Map to (movieID, averageRating) averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])

# Sort by average rating sortedMovies = averageRatings.sortBy(lambda x: x[1])

# Take the top 10 results results = sortedMovies.take(10)

# Print them out: for result in results: print(movieNames[result[0]], result[1]) Resolving the same problem with Spark 2.0 (Dataframe)

<<<<<<<<<<>>>>>>>>>>>>>>

Using movielens dataset to recommend movies for a given user. (using machine learning library MLLib)

LowestRatedMovieDataFrame.py

from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql import functions

def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1] return movieNames

def parseInput(line): fields = line.split() return Row(movieID = int(fields[1]), rating = float(fields[2]))

if __name__ == "__main__": # Create a SparkSession spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# Load up our movie ID -> name dictionary movieNames = loadMovieNames()

# Get the raw data lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data") # Convert it to a RDD of Row objects with (movieID, rating) movies = lines.map(parseInput) # Convert that to a DataFrame movieDataset = spark.createDataFrame(movies)

# Compute average rating for each movieID averageRatings = movieDataset.groupBy("movieID").avg("rating")

# Compute count of ratings for each movieID counts = movieDataset.groupBy("movieID").count()

# Join the two together (We now have movieID, avg(rating), and count columns) averagesAndCounts = counts.join(averageRatings, "movieID")

# Pull the top 10 results topTen = averagesAndCounts.orderBy("avg(rating)").take(10)

# Print them out, converting movie ID's to names as we go. for movie in topTen: print (movieNames[movie[0]], movie[1], movie[2])

# Stop the session spark.stop()

<<>><><><><><><><><><><><><><><><>

MovieRecommendationsALS.py

from pyspark.sql import SparkSession from pyspark.ml.recommendation import ALS from pyspark.sql import Row from pyspark.sql.functions import lit

# Load up movie ID -> movie name dictionary def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore') return movieNames

# Convert u.data lines into (userID, movieID, rating) rows def parseInput(line): fields = line.value.split() return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]))

if __name__ == "__main__": # Create a SparkSession spark = SparkSession.builder.appName("MovieRecs").getOrCreate()

# Load up our movie ID -> name dictionary movieNames = loadMovieNames()

# Get the raw data lines = spark.read.text("hdfs:///user/maria_dev/ml-100k/u.data").rdd

# Convert it to a RDD of Row objects with (userID, movieID, rating) ratingsRDD = lines.map(parseInput)

# Convert to a DataFrame and cache it ratings = spark.createDataFrame(ratingsRDD).cache()

# Create an ALS collaborative filtering model from the complete data set als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating") model = als.fit(ratings)

# Print out ratings from user 0: print(" Ratings for user ID 0:") userRatings = ratings.filter("userID = 0") for rating in userRatings.collect(): print movieNames[rating['movieID']], rating['rating']

print(" Top 20 recommendations:") # Find movies rated more than 100 times ratingCounts = ratings.groupBy("movieID").count().filter("count > 100") # Construct a "test" dataframe for user 0 with every movie rated more than 100 times popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(0))

# Run our model on that list of popular movies for user ID 0 recommendations = model.transform(popularMovies)

# Get the top 20 movies with the highest predicted rating for this user topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)

for recommendation in topRecommendations: print (movieNames[recommendation['movieID']], recommendation['prediction'])

spark.stop()

<><><><><><><><<><><><><><><><><><>

Using RDD (Spark 1.0)

LowestRatedPopularMovieSpark.py

from pyspark import SparkConf, SparkContext

# This function just creates a Python "dictionary" we can later # use to convert movie ID's to movie names while printing out # the final results. def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1] return movieNames

# Take each line of u.data and convert it to (movieID, (rating, 1.0)) # This way we can then add up all the ratings for each movie, and # the total number of ratings for each movie (which lets us compute the average) def parseInput(line): fields = line.split() return (int(fields[1]), (float(fields[2]), 1.0))

if __name__ == "__main__": # The main script - create our SparkContext conf = SparkConf().setAppName("WorstMovies") sc = SparkContext(conf = conf)

# Load up our movie ID -> movie name lookup table movieNames = loadMovieNames()

# Load up the raw u.data file lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")

# Convert to (movieID, (rating, 1.0)) movieRatings = lines.map(parseInput)

# Reduce to (movieID, (sumOfRatings, totalRatings)) ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )

# Filter out movies rated 10 or fewer times popularTotalsAndCount = ratingTotalsAndCount.filter(lambda x: x[1][1] > 10)

# Map to (movieID, averageRating) averageRatings = popularTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])

# Sort by average rating sortedMovies = averageRatings.sortBy(lambda x: x[1])

# Take the top 10 results results = sortedMovies.take(10)

# Print them out: for result in results: print(movieNames[result[0]], result[1])

<><><>><><><><><><><><><><><><><><><>

Using Dataframe (Spark 2.0)

LowestRatedPopularMovieDataFrame.py

from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql import functions

def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1] return movieNames

def parseInput(line): fields = line.split() return Row(movieID = int(fields[1]), rating = float(fields[2]))

if __name__ == "__main__": # Create a SparkSession spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# Load up our movie ID -> name dictionary movieNames = loadMovieNames()

# Get the raw data lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data") # Convert it to a RDD of Row objects with (movieID, rating) movies = lines.map(parseInput) # Convert that to a DataFrame movieDataset = spark.createDataFrame(movies)

# Compute average rating for each movieID averageRatings = movieDataset.groupBy("movieID").avg("rating")

# Compute count of ratings for each movieID counts = movieDataset.groupBy("movieID").count()

# Join the two together (We now have movieID, avg(rating), and count columns) averagesAndCounts = counts.join(averageRatings, "movieID")

# Filter movies rated 10 or fewer times popularAveragesAndCounts = averagesAndCounts.filter("count > 10")

# Pull the top 10 results topTen = popularAveragesAndCounts.orderBy("avg(rating)").take(10)

# Print them out, converting movie ID's to names as we go. for movie in topTen: print (movieNames[movie[0]], movie[1], movie[2])

# Stop the session spark.stop()

Step by Step Solution

There are 3 Steps involved in it

Step: 1

blur-text-image

Get Instant Access to Expert-Tailored Solutions

See step-by-step solutions with expert insights and AI powered tools for academic success

Step: 2

blur-text-image_2

Step: 3

blur-text-image_3

Ace Your Homework with AI

Get the answers you need in no time with our AI-driven, step-by-step assistance

Get Started

Recommended Textbook for

Database Systems Design Implementation And Management

Authors: Carlos Coronel, Steven Morris

14th Edition

978-0357673034

More Books

Students also viewed these Databases questions

Question

What is quality of work life ?

Answered: 1 week ago

Question

What is meant by Career Planning and development ?

Answered: 1 week ago

Question

What are Fringe Benefits ? List out some.

Answered: 1 week ago