Question
sprk LowestRatedMovieSpark.py from pyspark import SparkConf, SparkContext # This function just creates a Python dictionary we can later # use to convert movie ID's to
sprk
LowestRatedMovieSpark.py
from pyspark import SparkConf, SparkContext
# This function just creates a Python "dictionary" we can later # use to convert movie ID's to movie names while printing out # the final results. def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1] return movieNames
# Take each line of u.data and convert it to (movieID, (rating, 1.0)) # This way we can then add up all the ratings for each movie, and # the total number of ratings for each movie (which lets us compute the average) def parseInput(line): fields = line.split() return (int(fields[1]), (float(fields[2]), 1.0))
if __name__ == "__main__": # The main script - create our SparkContext conf = SparkConf().setAppName("WorstMovies") sc = SparkContext(conf = conf)
# Load up our movie ID -> movie name lookup table movieNames = loadMovieNames()
# Load up the raw u.data file lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
# Convert to (movieID, (rating, 1.0)) movieRatings = lines.map(parseInput)
# Reduce to (movieID, (sumOfRatings, totalRatings)) ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )
# Map to (movieID, averageRating) averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])
# Sort by average rating sortedMovies = averageRatings.sortBy(lambda x: x[1])
# Take the top 10 results results = sortedMovies.take(10)
# Print them out: for result in results: print(movieNames[result[0]], result[1]) Resolving the same problem with Spark 2.0 (Dataframe)
<<<<<<<<<<>>>>>>>>>>>>>>
Using movielens dataset to recommend movies for a given user. (using machine learning library MLLib)
LowestRatedMovieDataFrame.py
from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql import functions
def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1] return movieNames
def parseInput(line): fields = line.split() return Row(movieID = int(fields[1]), rating = float(fields[2]))
if __name__ == "__main__": # Create a SparkSession spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
# Load up our movie ID -> name dictionary movieNames = loadMovieNames()
# Get the raw data lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data") # Convert it to a RDD of Row objects with (movieID, rating) movies = lines.map(parseInput) # Convert that to a DataFrame movieDataset = spark.createDataFrame(movies)
# Compute average rating for each movieID averageRatings = movieDataset.groupBy("movieID").avg("rating")
# Compute count of ratings for each movieID counts = movieDataset.groupBy("movieID").count()
# Join the two together (We now have movieID, avg(rating), and count columns) averagesAndCounts = counts.join(averageRatings, "movieID")
# Pull the top 10 results topTen = averagesAndCounts.orderBy("avg(rating)").take(10)
# Print them out, converting movie ID's to names as we go. for movie in topTen: print (movieNames[movie[0]], movie[1], movie[2])
# Stop the session spark.stop()
<<>><><><><><><><><><><><><><><><>
MovieRecommendationsALS.py
from pyspark.sql import SparkSession from pyspark.ml.recommendation import ALS from pyspark.sql import Row from pyspark.sql.functions import lit
# Load up movie ID -> movie name dictionary def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore') return movieNames
# Convert u.data lines into (userID, movieID, rating) rows def parseInput(line): fields = line.value.split() return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]))
if __name__ == "__main__": # Create a SparkSession spark = SparkSession.builder.appName("MovieRecs").getOrCreate()
# Load up our movie ID -> name dictionary movieNames = loadMovieNames()
# Get the raw data lines = spark.read.text("hdfs:///user/maria_dev/ml-100k/u.data").rdd
# Convert it to a RDD of Row objects with (userID, movieID, rating) ratingsRDD = lines.map(parseInput)
# Convert to a DataFrame and cache it ratings = spark.createDataFrame(ratingsRDD).cache()
# Create an ALS collaborative filtering model from the complete data set als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating") model = als.fit(ratings)
# Print out ratings from user 0: print(" Ratings for user ID 0:") userRatings = ratings.filter("userID = 0") for rating in userRatings.collect(): print movieNames[rating['movieID']], rating['rating']
print(" Top 20 recommendations:") # Find movies rated more than 100 times ratingCounts = ratings.groupBy("movieID").count().filter("count > 100") # Construct a "test" dataframe for user 0 with every movie rated more than 100 times popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(0))
# Run our model on that list of popular movies for user ID 0 recommendations = model.transform(popularMovies)
# Get the top 20 movies with the highest predicted rating for this user topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)
for recommendation in topRecommendations: print (movieNames[recommendation['movieID']], recommendation['prediction'])
spark.stop()
<><><><><><><><<><><><><><><><><><>
Using RDD (Spark 1.0)
LowestRatedPopularMovieSpark.py
from pyspark import SparkConf, SparkContext
# This function just creates a Python "dictionary" we can later # use to convert movie ID's to movie names while printing out # the final results. def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1] return movieNames
# Take each line of u.data and convert it to (movieID, (rating, 1.0)) # This way we can then add up all the ratings for each movie, and # the total number of ratings for each movie (which lets us compute the average) def parseInput(line): fields = line.split() return (int(fields[1]), (float(fields[2]), 1.0))
if __name__ == "__main__": # The main script - create our SparkContext conf = SparkConf().setAppName("WorstMovies") sc = SparkContext(conf = conf)
# Load up our movie ID -> movie name lookup table movieNames = loadMovieNames()
# Load up the raw u.data file lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
# Convert to (movieID, (rating, 1.0)) movieRatings = lines.map(parseInput)
# Reduce to (movieID, (sumOfRatings, totalRatings)) ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )
# Filter out movies rated 10 or fewer times popularTotalsAndCount = ratingTotalsAndCount.filter(lambda x: x[1][1] > 10)
# Map to (movieID, averageRating) averageRatings = popularTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])
# Sort by average rating sortedMovies = averageRatings.sortBy(lambda x: x[1])
# Take the top 10 results results = sortedMovies.take(10)
# Print them out: for result in results: print(movieNames[result[0]], result[1])
<><><>><><><><><><><><><><><><><><><>
Using Dataframe (Spark 2.0)
LowestRatedPopularMovieDataFrame.py
from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql import functions
def loadMovieNames(): movieNames = {} with open("ml-100k/u.item") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1] return movieNames
def parseInput(line): fields = line.split() return Row(movieID = int(fields[1]), rating = float(fields[2]))
if __name__ == "__main__": # Create a SparkSession spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
# Load up our movie ID -> name dictionary movieNames = loadMovieNames()
# Get the raw data lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data") # Convert it to a RDD of Row objects with (movieID, rating) movies = lines.map(parseInput) # Convert that to a DataFrame movieDataset = spark.createDataFrame(movies)
# Compute average rating for each movieID averageRatings = movieDataset.groupBy("movieID").avg("rating")
# Compute count of ratings for each movieID counts = movieDataset.groupBy("movieID").count()
# Join the two together (We now have movieID, avg(rating), and count columns) averagesAndCounts = counts.join(averageRatings, "movieID")
# Filter movies rated 10 or fewer times popularAveragesAndCounts = averagesAndCounts.filter("count > 10")
# Pull the top 10 results topTen = popularAveragesAndCounts.orderBy("avg(rating)").take(10)
# Print them out, converting movie ID's to names as we go. for movie in topTen: print (movieNames[movie[0]], movie[1], movie[2])
# Stop the session spark.stop()
Step by Step Solution
There are 3 Steps involved in it
Step: 1
Get Instant Access to Expert-Tailored Solutions
See step-by-step solutions with expert insights and AI powered tools for academic success
Step: 2
Step: 3
Ace Your Homework with AI
Get the answers you need in no time with our AI-driven, step-by-step assistance
Get Started