Taxi Trips (exercise)#
Kernel installation#
Run the following line (script) once, if the required kernel (big-data-dda-kernel) is not installed.
You need to install this kernel only once
%%bash
mkdir -p ~/.local/share/jupyter/kernels/ai4seismology-bigdata
cp /data/horse/ws/s4122485-ai4seismology_dev/thursday_bigdata/kernel.json ~/.local/share/jupyter/kernels/ai4seismology-bigdata
Important!!!#
Once the Kernel is installed,
Reload the notebook (reload/refresh the web page)
Select the kernel: Menu -> Kernel -> Change Kernel -> Select “dda-kernel” Always use this kernel for upcoming exercises.
Select ai4seismology-bigdata kernel#
import sys
!{sys.executable} -m pip install --user --upgrade ipympl jupyter_leaflet leafmap ipyleaflet
Restart the Jupyter Server#
# To enable horizontal scrolling
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
Initialisation of Spark cluster#
# Note: Skip if running on local machine
# Import utilities required to run big data frameworks on ZIH HPC systems
from big_data_utils.environment_utils import ClusterConfig
from big_data_utils.cluster_utils import ClusterService
from big_data_utils.utils import kill_java_processes_by_name
# Note: Skip if running on local machine
# Configure the cluster environment
myconfig = ClusterConfig(fw_name="spark")
#myconfig.configure_env(conf_dest="./my-conf", conf_template="/projects/p_scads_bigdatahpc/.template/spark")
myconfig.configure_env(conf_dest="./my-conf",randomize_ports=True)
# Initialize the cluster service class
mycluster = ClusterService("spark")
# Check which processes are running
mycluster.check_status()
# Note: Skip if running on local machine
# Start Spark standalone cluster
mycluster.start_cluster()
# Note: Skip if running on local machine
# Check if the master and worker processes are started or not
mycluster.check_status()
Download of NYC taxi trips and taxi zone file#
Modify the base directory in the following cell if you want to save data files in different directories.
base_directory = "./data"
import os
import wget
import zipfile
base_directory = os.path.abspath(base_directory)
os.environ["BASEDIRECTORY"] = base_directory
# Download yellow trip data
data_directory = base_directory + "/taxidata"
data_file = "yellow_tripdata_2022-01.parquet"
data_path = data_directory + "/" + data_file
if not os.path.exists(data_path):
os.makedirs(data_directory, exist_ok=True)
if not os.path.exists(data_path):
wget.download("https://d37ci6vzurychx.cloudfront.net/trip-data/" + data_file, out = data_directory)
# Download zone data
zone_directory = base_directory + "/taxizonesdata"
if not os.path.isdir(zone_directory):
os.makedirs(zone_directory, exist_ok=True)
zone_zipfile = "taxi_zones.zip"
zone_zipfile_path = zone_directory + "/" + zone_zipfile
if not os.path.exists(zone_zipfile_path):
wget.download("https://d37ci6vzurychx.cloudfront.net/misc/" + zone_zipfile, out = zone_directory)
with zipfile.ZipFile(zone_zipfile_path, "r") as zip_ref:
zip_ref.extractall(zone_directory)
zip_ref.close()
zone_lookup_file = "taxi_zone_lookup.csv"
if not os.path.exists(zone_directory + "/" + zone_lookup_file):
wget.download("https://d37ci6vzurychx.cloudfront.net/misc/" + zone_lookup_file, out = zone_directory)
Initialisation of Spark context#
import findspark
import os
findspark.init(os.environ['SPARK_HOME'])
print(os.environ['SPARK_HOME'])
import platform
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master(f"spark://{myconfig.get_master_host()}:{myconfig.get_master_port()}") \
.appName("Python Spark Map Visualization of NYC taxi trips") \
.getOrCreate()
sc = spark.sparkContext
# Check running java processes
mycluster.check_status()
trips = spark.read.parquet(data_path)
trips.dtypes
trips.show()
Grouping using groupBy#
# using groupBy
trips.groupBy("VendorID").count().show()
Exercise 1#
Count trips grouped by passengers.
Are there unexpected values? How can they be interpreted? Find more information on https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page.
trips. #add something here
Exercise 2#
Find the minimal distance for these groups. Are there unexpected values? How can they be interpreted?
trips. #add something here
Exercise 3#
Remove all trip distances of 0.0 miles from the previous result. What do you expect?
trips. #add something here
SQL Queries#
from pyspark.sql.types import *
sqlContext = SparkSession.builder.getOrCreate()
# Create a temporary view from the DataFrame
trips.createOrReplaceTempView("trips")
# Apply a SQL query
query = "SELECT fare_amount FROM trips WHERE trip_distance>=5"
sqlContext.sql(query).show()
Exercise 4#
Rewrite the previous statement without SQL, but with a functional statement.
trips. # add something here
# Compute summary statistics
trips.describe().show()
Exercise 5#
Find the distance for tips larger than $5 - Formulate a SQL query and apply it on the DataFrame.
query = # add something here
sqlContext.sql(query).show()
Exercise 6#
Formulate a query to get total amount of trip for distances larger than 30 miles.
query = # add something here
sqlContext.sql(query).show()
Exercise 7#
Create a box-and-whisker plot of the numerical columns. What do these say about the data?
%matplotlib inline
import matplotlib.pyplot as plt
for column in trips.dtypes:
name = column[0]
colType = column[1]
if colType != 'string' and colType != 'timestamp' and colType != 'timestamp_ntz':
columnQuantiles = trips. # add something here
print("{} quantiles: {}".format(name,columnQuantiles))
stats = [{
"whislo": columnQuantiles # add something here
"q1": columnQuantiles # add something here
"med": columnQuantiles # add something here
"q3": columnQuantiles # add something here
"whishi": columnQuantiles # add something here
}]
fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(5,5), sharey=True)
axes.bxp(bxpstats=stats, showfliers=False)
axes.grid(True)
axes.set_title(name)
Exercise 8#
Provide an overview over the number of trips per week day.
def barchart(dataRows, titleSuffix):
positions = list(reversed(range(len(dataRows))))
names = [str(item[titleSuffix]) + " (" + str(item['count']) + ")" for item in dataRows]
values = [item['count'] for item in dataRows]
plt.grid()
plt.barh(positions,values,align="center")
plt.yticks(positions,names)
plt.xlabel("Number of trips")
plt.title("Distribution of trips per " + titleSuffix)
plt.show()
import datetime
help(datetime.datetime.weekday)
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
import calendar
#udf stands for user defined function
@udf
def weekdayStr(d):
return calendar.day_name # add something here
@udf(returnType=IntegerType())
def weekday(d):
return d.weekday()
#Replace function weekday with function weekdayStr if you want.
weekdayRows = trips.select(weekday(trips.tpep_dropoff_datetime).alias("weekday")). # add something here
barchart(weekdayRows, "weekday")
Exercise 9#
Provide an overview over the number of trips per hour.
@udf(returnType=IntegerType())
def hour(d):
return d.hour
hourRows = # add something here
barchart(hourRows, "hour")
Map Visualisations#
import leafmap
def getMap():
map_args={
"google_map":"HYBRID",
#center to New York at 41 degrees north and 74 degrees west ([lat, lon])
"center":[40.702557, -74.012318],
"zoom":12,
"height":"450px",
"width":"800px",
"max_zoom":"20"
}
return leafmap.Map(**map_args)
getMap()
def taxizoneColorFunction(taxiZonesIntensity, maximum_intensity, taxizoneFeature):
taxizoneId = taxizoneFeature["properties"]["LocationID"]
taxizoneIntensity = taxiZonesIntensity[taxizoneId] if taxizoneId in taxiZonesIntensity else 0
return {
"color": "black",
"fillColor": '#%02X0000' % (int(taxizoneIntensity*255/maximum_intensity))
}
def getTaxiZoneStylingFunction(taxiZonesIntensity):
maximum_intensity = max(taxiZonesIntensity.values())
return lambda x: taxizoneColorFunction(taxiZonesIntensity, maximum_intensity, x)
import sys
print(sys.executable)
taxizonesFile = base_directory+"/taxizonesdata/taxi_zones.shp"
def getZoneCenters():
zone_centers={}
my_geojson = leafmap.shp_to_geojson(taxizonesFile)
for feature in my_geojson["features"]:
location = feature["properties"]["LocationID"]
coordinates = feature["geometry"]["coordinates"]
avg_lat = 0
avg_lon = 0
count = 0
for coordinate_list in coordinates:
for coordinate in coordinate_list:
if type(coordinate) == tuple and len(coordinate) == 2:
avg_lat += coordinate[1]
avg_lon += coordinate[0]
count += 1
elif len(coordinate) > 2:
for coord in coordinate:
avg_lat += coord[1]
avg_lon += coord[0]
count += 1
avg_lat = avg_lat/count
avg_lon = avg_lon/count
zone_centers[location]=[avg_lat, avg_lon]
return zone_centers
zoneCenters = getZoneCenters()
def getHeatCenters(taxizoneIntensityMap):
heat_data=[]
for key, value in zoneCenters.items():
location = key
(lat, lon) = value
taxizoneIntensity = taxizoneIntensityMap[location] if location in taxizoneIntensityMap else 0
heat_data.append([lat, lon, taxizoneIntensity])
return heat_data
Exercise 10#
Get the number of trips which start/end in each zone.
pickupData = trips. #add something here
dropoffData = trips. #add something here
grouped_by_pickup_location={row["PULocationID"]:row["count"] for row in pickupData}
grouped_by_dropoff_location={row["DOLocationID"]:row["count"] for row in dropoffData}
m = getMap()
m.add_shp(in_shp=taxizonesFile,layer_name="taxizone",style={},hover_style={}, style_callback=getTaxiZoneStylingFunction(grouped_by_pickup_location), fill_colors=None,
info_mode='on_hover')
m.layer_opacity('taxizone', 0.9)
m.add_heatmap(data=getHeatCenters(grouped_by_pickup_location), name='pickup_heat', radius=10)
m.layer_opacity('pickup_heat', 0.9)
m
m = getMap()
m.add_shp(in_shp=taxizonesFile,layer_name="taxizone",style={},hover_style={}, style_callback=getTaxiZoneStylingFunction(grouped_by_dropoff_location), fill_colors=None,
info_mode='on_hover')
m.layer_opacity('taxizone', 0.9)
m.add_heatmap(data=getHeatCenters(grouped_by_dropoff_location), name='dropoff_heat', radius=10)
m.layer_opacity('dropoff_heat', 0.9)
m
Exercise 11#
Collect the trips with the 10 highest tips. Be careful not to use trips with zones which indicate “Unknown” values.
zoneLookup = spark.read.csv(base_directory + "/taxizonesdata/taxi_zone_lookup.csv", header=True, inferSchema=True)
zoneLookup.filter(zoneLookup.Borough == "Unknown").show()
zoneLookup.filter(zoneLookup.Borough == "N/A").show()
zoneLookup.dtypes
trips.dtypes
help(trips.join)
# add something here to filter out Unknown values
tripsWithHighestTips = temporary. # add something here to take the top 10 elements
tripsWithHighestTips
from geojson import FeatureCollection, Feature, LineString
def to_lon_and_lat(latLonCoordinate):
return [latLonCoordinate[1],latLonCoordinate[0]]
def trip_to_geojson(trip):
start_point = to_lon_and_lat(zoneCenters[trip["PULocationID"]])
end_point = to_lon_and_lat(zoneCenters[trip["DOLocationID"]])
props = {
"starttime":trip["tpep_pickup_datetime"].isoformat(),
"startzone":trip["PULocationID"],
"endtime":trip["tpep_dropoff_datetime"].isoformat(),
"endzone":trip["DOLocationID"],
}
return Feature(geometry=LineString([start_point, end_point]), properties=props)
def tripList_to_geojson(tripList):
coll = FeatureCollection(list(map(lambda item: trip_to_geojson(item),tripList)))
return coll
trip_geojson = tripList_to_geojson(tripsWithHighestTips)
m = getMap()
m.add_shp(in_shp=taxizonesFile,layer_name="taxizone")
m.layer_opacity('taxizone', 0.9)
m.add_geojson(in_geojson=trip_geojson,layer_name="connections", style={"color":"red"})
m.layer_opacity('connections', 1.0)
m
# Check status of runnning java processes
mycluster.check_status()
# Stopping spark context
sc.stop()
mycluster.check_status()
mycluster.stop_cluster()
kill_java_processes_by_name("SparkSubmit")
mycluster.check_status()