HADOOP|Distributed System|Spark|Python|分布式系统代写
Jupyter Server: localPython 3: Idle
DS/CMPSC 410 MiniProject #3
Spring 2020
Instructor: John Yen
TA: Rupesh Prajapati
Learning Objectives
- Be able to identify top k services that are open for scanners in the Darknet dataset
- Be able to identify frequent 2-service sets (based on top 10 services) and identify potentially interesting patterns
- Be able to compute 3-service sets (based on top 10 services) that are open for scanners in the Darknet dataset
Total points: 30
- Exercise 1: 5 points
- Exercise 2: 10 points ( 5 points for part a, 5 points for part b)
- Exercise 3: 15 points (10 points for part a, 5 points for part b)
Due: 5 pm, April 17, 2020.
[-]import pysparkimport csv[-]from pyspark import SparkContextfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructField, StructType, StringType, LongTypefrom pyspark.sql.functions import col, columnfrom pyspark.sql.functions import exprfrom pyspark.sql.functions import splitfrom pyspark.sql import Rowfrom pyspark.ml import Pipelinefrom pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, IndexToStringfrom pyspark.ml.clustering import KMeans[-]ss = SparkSession.builder.master(“local”).appName(“FrequentPortSet”).getOrCreate()[-]Scanners_df = ss.read.csv(“/storage/home/juy1/work/Darknet/scanners-dataset1-anon.csv”, header= True, inferSchema=True )
We can use printSchema() to display the schema of the DataFrame Scanners_df to see whether it was inferred correctly.
[-]Scanners_df.printSchema()
Part A Transfosrm the feature “host_services_per_censys” into an array of services.
The original value of the column is a string that connects all the ports scanned by a scanner. The different services that are open by a scanner are connected by dash “-“. For example, “81-161-2000” indicates the scanner has three ports/services open: port 81, port 161, and port 2000. Therefore, we want to use split to separate it into an array of ports/services open by each scanner. This transformation is important because it enables the identification of frequent service sets open among scanners.
The original value of the column “host_services_per_censys”
[-]Scanners_df.select(“host_services_per_censys”).show(30)
We want to find the top 10 services/ports that most scanners have them open. There are multiple ways to find this information using Spark. We will demonstrate one below, which involves the following steps: (The first five steps are identical to those for Mini Project Deliverable #2)
- Convert the DataFrame Scanners_df into an RDD
- Map to each row of the DF-converted RDD to extract the column “host_services_per_censys”. Save the mapping result in a new RDD (which contains only values of the column).
- Use flatMap to split to the string (using “-” as the delimiter) to convert the RDD into an RDD of ports/services that are open on the host of the scanner.
- Use map to generate a key-value pair RDD, where key is a port/service opens on a scanner, the value is 1.
- Use reduceByKey to count the total number of scanners that have a specific port/service open.
- Sort the services, select the top 10 services.
[-]# Step 1: Convert the DataFrame Scanners_df into an RDDfrom pyspark.sql.functions import splitScanners_RDD = Scanners_df.rddScanners_RDD.persist().take(5)[-]# Step 2: Map to each row of the DF-converted RDD to extract the column “host_services_per_censys”. Save the mapping result # in a new RDD (whih contains only values of the column)Host_services_column = Scanners_RDD.map(lambda row: row.host_services_per_censys)Host_services_column.persist().take(10)
We can transform the string into a list of services (i.e., port numbers) that the scanner has opened using map.
[-]Host_services_rdd=Host_services_column.map(lambda string: string.split(“-“))Host_services_rdd.persist().take(10)
Step 3: However, in order to count how many scanners are keeping a specific port/service open, it is easier to use flatMap (instead of map above) to “flatten” the results of splitting (using “-” as the delimiter to convert the input RDD into an RDD of all ports/services that are open on the host of each sourceIP.
[-]Host_services_f_list = Host_services_column.flatMap(lambda string: string.split(“-“))
Step 4: Use map to generate a key-value pair RDD, where key is a port/service opens on a scanner, the value is 1.
[-]Host_services_count = Host_services_f_list.map(lambda s: (s, 1) ) Host_services_count.take(10)
Step 5: Use reduceByKey to count the total number of scanners that have a specific port/service open.
- Calculate the total number of scanners that have each port/service open.
- Sort them in descending order of count so that we can see the port/services that are open for most scanners. Save the resulted file in a directory you specified.
[-]Host_services_total= Host_services_count.reduceByKey(lambda a, b: a+b, 1)Host_services_total.persist().take(10)[-]## Step 6: Sort the port/service in descending order.Count_Services = Host_services_total.map(lambda x: tuple(reversed(x)) )Sorted_Count_Services = Count_Services.sortByKey(ascending=False)Sorted_Count_Services.persist().top(10)[-]Sorted_Count_Services.saveAsTextFile(“/storage/home/juy1/work/Darknet/Sorted_Service_Counts_by_Scanners10”)[-]Sorted_Services=Sorted_Count_Services.map(lambda x: x[1]).collect()Sorted_Services
Exercise 1: (5 points) Checking that the looping over the top 10 services to find frequent 2-service sets open by scanners is correct. Since the order of the service in the 2-service set does not matter, you do not want to loop over a pair twice. For example, you do not want to include both “80 and 443” and “443 and 80”.
[-]## Code for Exercise 1for i in range(?,?): for j in range(?,?): print(Sorted_Services[i]+ ” and ” + Sorted_Services[j])
Par B Creating Service_Array column and 10 new columns to record whether each of top 10 services is open for each scanner.
B.1 Before we loop over the 2-service set, we need to first create the Service_Array column so that we can later check whether the array contains any of the specified services.
[-]Scanners_df2=Scanners_df.withColumn(“Services_Array”, split(col(“host_services_per_censys”), “-“) )Scanners_df2.persist().show(1)
B.2 We create a new column to record whether each scanner has the top service (i.e., port 80) open using array_contains
[-]from pyspark.sql.functions import array_containsScanners_df2=Scanners_df2.withColumn(“Top1_service”,array_contains(“Services_Array”,\ Sorted_Services[0]))
We want to double check the total number of rows with “Top1_service” being true is the same as the number of scanners whose port 80 is open (calculated earlier to be 19362)
[-]Scanners_df_Top1=Scanners_df2.where(“Top1_service”)[-]from pyspark.sql.functions import countDistinctScanners_df_Top1.select(countDistinct(“sourceIP”)).show()
B.3 We are now ready to add other columns for other top services
The names of the new columns are Top2_service, Top3_service, …, Top10_service
[-]for i in range(1, 10): new_column_name=”Top”+str(i+1)+”_service” Scanners_df2=Scanners_df2.withColumn(new_column_name,array_contains(“Services_Array”,\ Sorted_Services[i]))
We can check whether these new columns are actually created using show
[-]Scanners_df2.show(2)
Part C Determine the frequency of 2-service set (i.e., the number of scanners who have both services open for each 2-service set)
[-]import numpy as npfreq_table=np.empty([10,10]) for i in range(0,10): for j in range(i+1,10): ### Calculate the frequency of 2-service set print(“Frequency of “+Sorted_Services[i]+” and ” + Sorted_Services[j] +”:”) column_name_a=”Top”+str(i+1)+”_service” column_name_b=”Top”+str(j+1)+”_service” Scanners_df_temp= Scanners_df2.where(column_name_b).where(column_name_a) freq_table[i,j]=Scanners_df_temp.count() print(freq_table[i,j])
Exercise 2 (10 points) (a) List 2-service set that has at least 1000 scanners. List their frequency as well. (b) Identify at least one or two 2-service set that you believe is interesting. Explain why you think it is interesting.
Answer to Exercise 2 (a):
Answer to Exercise 2 (b):
Part D Calculate the frequency of 3-service set
[-]
Exercise 3 (15 points) (a) Calculate the frequency of 3-service set for top 10 services. (10 points) (b) List all three-service set that are open for at least 1000 scanners. (5 points)
[-]## Code for Exercise 3 (a)[-]## Answer for Exercise 3 (b)[-]