Monday, 29 August 2016

Google Viz

GeoChartID26f01b803502
Data: StateMilk • Chart ID: GeoChartID26f01b803502googleVis-0.6.0
R version 3.3.1 (2016-06-21) • Google Terms of UseDocumentation and Data Policy

Friday, 19 August 2016

Charts

First web chart

Pizza Lover
Ice Cream Lover

Thursday, 4 August 2016

Baseball Analytics: An Introduction to Sabermetrics using Python



Sabermetrics is the apllication of statistical analysis to baseball data in order to measure in-game activity. The term Sabermetrics comes from saber (Society for American Baseball Research) and metrics (as in econometrics).
The movie Moneyball focuses on the “quest for the secret of success in baseball”. It follows a low-budget team, the Oakland Athletics, who believed that underused statistics, such as a player’s ability to get on base, better predict the ability to score runs than typical statistics like home runs, RBIs (runs batted in), and batting average. Obtaining players who excelled in these underused statistics turned out to be much more affordable for the team.
In 2003, Michael Lewis published Moneyball about Billy Beane, the Oakland Athletics General Manager since 1997. The book was centered around Billy Beane's use of Sabemetrics to identify and recruit under-valued baseball players. With this strategy, his team could achieve as many wins as teams with more than double the payroll. The figures below show the relationship between team salaries and number of wins for years: 1997, 2001, 2003, 2013. The green dot represents the Oakland Athletics, the blue dot represents the New York Yankees, and the red dot represents The Boston Red Sox. We can see that the Oakland Athletics went from the underperforming team in 1997, to became a highly competitive team with a comparable number of wins to the New York Yankees. The Oakland Athletics made it to the play-offs in 4 successive years: 2000,2001,2002,2003.


Getting the data and setting up your machine

For this blog post, I will use Lahman’s Baseball Database and Python programming language to explain some of the techniques used in Sabermetrics. This Database contains complete batting and pitching statistics from 1871 to 2013, plus fielding statistics, standings, team stats, managerial records, post-season data, and more. You can download the data from this this link. I will be using two files from this dataset:Salaries.csv and Teams.csvTo execute the code from this blog, I will use 5 Python libraries: Numpy, Scipy, Pandas and Matplotlib and statsmodels.
Numpy- open source extension module for Python.It provides fast pre-compiled functions for numerical routines.
It adds support to Python for large, multi-dimensional arrays and matrices. Besides that it supplies a large library of high-level mathematical functions to operate on these arrays.

Scipy-SciPy is widely used in scientific and technical computing. It contains modules for optimization, linear algebra, integration, interpolation, special functions, FFT, signal and image processing, ODE solvers and other tasks common in science and engineering.

Pandas-Pandas is a library written for the Python programming language for data manipulation and analysis. In particular, it offers data structures and operations for manipulating numerical tables and time series. Pandas is free software released under the three-clause BSD license.


Matplotlib- matplotlib is a plotting library for NumPy. It provides an object-oriented API for embedding plots into applications using general-purpose GUI toolkits like wxPython, Qt, or GTK+.

Statsmodels- Statsmodels is a Python module that allows users to explore data, estimate statistical models, and perform statistical tests. An extensive list of descriptive statistics, statistical tests, plotting functions, and result statistics are available for different types of data and each estimator



Load the data into Jupyter Notebook:





















KDD CUP 1999 Context Using Spark and Python

Datasets:

We will be using datasets from the KDD Cup 1999.


This work is on reference with https://github.com/jadianes/spark-py-notebooks


Dataset Description :

Since 1999, KDD’99 has been the most wildly used data set for the evaluation of anomaly detection methods. This data set is prepared by Stolfo  and is built based on the data captured in DARPA’98 IDS evaluation program . DARPA’98 is about 4 gigabytes of compressed raw (binary) tcpdump data of 7 weeks of network traffic, which can be processed into about 5 million connection records, each with about 100 bytes. The two weeks of test data have around 2 million connection records. KDD training dataset consists of approximately 4,900,000 single connection vectors each of which contains 41 features and is labeled as either normal or an attack, with exactly one specific attack type. 

The simulated attacks fall in one of the following four categories:


1) Denial of Service Attack (DoS): is an attack in which the attacker makes some computing or memory resource too busy or too full to handle legitimate requests, or denies legitimate users access to a machine. 


2) User to Root Attack (U2R): is a class of exploit in which the attacker starts out with access to a normal user account on the system (perhaps gained by sniffing passwords, a dictionary attack, or social engineering) and is able to exploit some vulnerability to gain root access to the system. 


3) Remote to Local Attack (R2L): occurs when an attacker who has the ability to send packets to a machine over a network but who does not have an account on that machine exploits some vulnerability to gain local access as a user of that machine. 


4) Probing Attack: is an attempt to gather information about a network of computers for the apparent purpose of circumventing its security controls. 


This work is divided into 10 tasks which includes RDD creation,RDD sampling,RDD set operations,MLIB logistic regression,etc.

Notebooks

The following notebooks can be examined individually, although there is a more or less linear 'story' when followed in sequence. By using the same dataset they try to solve a related set of tasks with it.

RDD creation

About reading files and parallelize.

RDDs basics

A look at mapfilter, and collect.






Sampling RDDs

RDD sampling methods explained.

RDD set operations

Brief introduction to some of the RDD pseudo-set operations.





Data aggregations on RDDs

RDD actions reducefold, and aggregate.

Working with key/value pair RDDs

How to deal with key/value pairs in order to aggregate and explore data.







MLlib: Basic Statistics and Exploratory Data Analysis


A notebook introducing Local Vector types, basic statistics in MLlib for Exploratory Data Analysis and model selection.












Out[23]:
durationsrc_bytesdst_byteslandwrong_fragmenturgenthotnum_failed_loginslogged_innum_compromisedroot_shellsu_attemptednum_rootnum_file_creationsnum_shellsnum_access_filesnum_outbound_cmdsis_hot_loginis_guest_logincountsrv_countserror_ratesrv_serror_ratererror_ratesrv_rerror_ratesame_srv_ratediff_srv_ratesrv_diff_host_ratedst_host_countdst_host_srv_countdst_host_same_srv_ratedst_host_diff_srv_ratedst_host_same_src_port_ratedst_host_srv_diff_host_ratedst_host_serror_ratedst_host_srv_serror_ratedst_host_rerror_ratedst_host_srv_rerror_rate
duration1.0000000.0141960.299189-0.001068-0.0080250.0178830.1086390.0143630.1595640.0106870.0404250.0260150.0134010.0610990.0086320.019407-0.000019-0.0000100.205606-0.259032-0.250139-0.074211-0.073663-0.025936-0.0264200.062291-0.0508750.123621-0.161107-0.217167-0.2119790.231644-0.0652020.100692-0.056753-0.057298-0.007759-0.013891
src_bytes0.0141961.000000-0.167931-0.009404-0.0193580.0000940.113920-0.008396-0.0897020.1185620.0030670.002282-0.0020500.0277100.014403-0.0014970.0000100.0000190.0275110.6662300.722609-0.657460-0.652391-0.342180-0.3329770.744046-0.739988-0.1040420.1303770.7419790.729151-0.7129650.815039-0.140231-0.645920-0.641792-0.297338-0.300581
dst_bytes0.299189-0.1679311.000000-0.003040-0.0226590.0072340.1931560.0219520.8821850.1697720.0260540.012192-0.0038840.034154-0.0000540.065776-0.0000310.0000410.085947-0.639157-0.497683-0.205848-0.198715-0.100958-0.0813070.229677-0.2225720.521003-0.6119720.0241240.055033-0.035073-0.3961950.578557-0.167047-0.158378-0.0030420.001621
land-0.001068-0.009404-0.0030401.000000-0.000333-0.000065-0.000539-0.000076-0.002785-0.000447-0.000093-0.000049-0.000230-0.000150-0.000076-0.000211-0.0028810.002089-0.000250-0.010939-0.0101280.0141600.014342-0.000451-0.0016900.002153-0.0018460.020678-0.019923-0.0123410.002576-0.0018030.0042650.0161710.0135660.0122650.000389-0.001816
wrong_fragment-0.008025-0.019358-0.022659-0.0003331.000000-0.000150-0.004042-0.000568-0.020911-0.003370-0.000528-0.000248-0.001727-0.001160-0.000507-0.001519-0.0001470.000441-0.001869-0.057711-0.029117-0.008849-0.0233820.000430-0.0126760.010218-0.0093860.012117-0.029149-0.058225-0.0495600.055542-0.0154490.0073060.010387-0.0241170.046656-0.013666
urgent0.0178830.0000940.007234-0.000065-0.0001501.0000000.0085940.0630090.0068210.0317650.0674370.0000200.0619940.061383-0.0000660.0233800.0128790.005162-0.000100-0.004778-0.004799-0.001338-0.001327-0.000705-0.0007260.001521-0.001522-0.000788-0.005894-0.005698-0.0040780.005208-0.001939-0.000976-0.001381-0.001370-0.000786-0.000782
hot0.1086390.1139200.193156-0.000539-0.0040420.0085941.0000000.1125600.1891260.8115290.101983-0.0004000.0030960.0286940.0091460.004224-0.000393-0.0002480.463706-0.120847-0.114735-0.035487-0.0349340.0134680.0520030.041342-0.0405550.032141-0.074178-0.0179600.018783-0.017198-0.086998-0.014141-0.004706-0.0107210.1990190.189142
num_failed_logins0.014363-0.0083960.021952-0.000076-0.0005680.0630090.1125601.000000-0.0021900.0046190.0168950.0727480.0100600.015211-0.0000930.0055810.003431-0.001560-0.000428-0.018024-0.018027-0.003674-0.0040270.0353240.0348760.005716-0.005538-0.003096-0.028369-0.0150920.003004-0.002960-0.006617-0.0025880.0147130.0149140.0323950.032151
logged_in0.159564-0.0897020.882185-0.002785-0.0209110.0068210.189126-0.0021901.0000000.1611900.0252930.0118130.0825330.0555300.0243540.0726980.0000790.0001270.089318-0.578287-0.438947-0.187114-0.180122-0.091962-0.0722870.216969-0.2140190.503807-0.6827210.0803520.114526-0.093565-0.3595060.659078-0.143283-0.1324740.0072360.012979
num_compromised0.0106870.1185620.169772-0.000447-0.0033700.0317650.8115290.0046190.1611901.0000000.0855580.0489850.0285570.0312230.0112560.0069770.001048-0.000438-0.002504-0.097212-0.091154-0.030516-0.0302640.0085730.0540060.035253-0.0349530.036497-0.0416150.0034650.038980-0.039091-0.078843-0.020979-0.005019-0.0045040.2141150.217858
root_shell0.0404250.0030670.026054-0.000093-0.0005280.0674370.1019830.0168950.0252930.0855581.0000000.2334860.0945120.1406500.1320560.0693530.011462-0.006602-0.000405-0.016409-0.015174-0.004952-0.004923-0.001104-0.0011430.004946-0.0045530.002286-0.021367-0.0119060.000515-0.000916-0.0046170.008631-0.003498-0.0030320.0027630.002151
su_attempted0.0260150.0022820.012192-0.000049-0.0002480.000020-0.0004000.0727480.0118130.0489850.2334861.0000000.1193260.0531100.0404870.081272-0.0188960.012927-0.000219-0.008279-0.008225-0.002318-0.002295-0.001227-0.0012530.002634-0.0026490.000348-0.006697-0.006288-0.0057380.006687-0.0050200.0010520.0019740.0028930.0031730.001731
num_root0.013401-0.002050-0.003884-0.000230-0.0017270.0619940.0030960.0100600.0825330.0285570.0945120.1193261.0000000.0475210.0344050.0145130.001524-0.002585-0.001281-0.054721-0.053530-0.016031-0.015936-0.008610-0.0087080.013881-0.0113370.006316-0.078717-0.038689-0.0389350.047414-0.0159680.061030-0.008457-0.007096-0.000421-0.005012
num_file_creations0.0610990.0277100.034154-0.000150-0.0011600.0613830.0286940.0152110.0555300.0312230.1406500.0531100.0475211.0000000.0686600.031042-0.004081-0.0016640.013242-0.036467-0.034598-0.009703-0.010390-0.005069-0.0047750.009784-0.0087110.014412-0.049529-0.026890-0.0217310.027092-0.0150180.030590-0.002257-0.0042950.000626-0.001096
num_shells0.0086320.014403-0.000054-0.000076-0.000507-0.0000660.009146-0.0000930.0243540.0112560.1320560.0404870.0344050.0686601.0000000.019438-0.002592-0.006631-0.000405-0.013938-0.011784-0.004343-0.004740-0.002541-0.0025720.004282-0.0037430.001096-0.021200-0.012017-0.0099620.010761-0.0035210.015882-0.001588-0.002357-0.000617-0.002020
num_access_files0.019407-0.0014970.065776-0.000211-0.0015190.0233800.0042240.0055810.0726980.0069770.0693530.0812720.0145130.0310420.0194381.000000-0.001597-0.0028500.002466-0.045282-0.040497-0.013945-0.013572-0.0075810.0018740.015499-0.0151120.024266-0.023865-0.023657-0.0213580.026703-0.0332880.011765-0.011197-0.011487-0.004743-0.004552
num_outbound_cmds-0.0000190.000010-0.000031-0.002881-0.0001470.012879-0.0003930.0034310.0000790.0010480.011462-0.0188960.001524-0.004081-0.002592-0.0015971.0000000.8228900.000924-0.0000760.0001000.0001670.0002090.0005360.0003460.0002080.000328-0.000141-0.000424-0.000280-0.000503-0.000181-0.0004550.000288-0.000011-0.000372-0.000823-0.001038
is_hot_login-0.0000100.0000190.0000410.0020890.0004410.005162-0.000248-0.0015600.000127-0.000438-0.0066020.012927-0.002585-0.001664-0.006631-0.0028500.8228901.0000000.0015120.0000360.0000640.000102-0.000302-0.0005500.000457-0.000159-0.000235-0.000360-0.0001060.0002060.000229-0.0000040.0002830.000538-0.000076-0.000007-0.000435-0.000529
is_guest_login0.2056060.0275110.085947-0.000250-0.001869-0.0001000.463706-0.0004280.089318-0.002504-0.000405-0.000219-0.0012810.013242-0.0004050.0024660.0009240.0015121.000000-0.062340-0.062713-0.017343-0.017240-0.008867-0.0091930.018042-0.017000-0.008878-0.055453-0.044366-0.0417490.044640-0.038092-0.012578-0.001066-0.0168850.025282-0.004292
count-0.2590320.666230-0.639157-0.010939-0.057711-0.004778-0.120847-0.018024-0.578287-0.097212-0.016409-0.008279-0.054721-0.036467-0.013938-0.045282-0.0000760.000036-0.0623401.0000000.950587-0.303538-0.308923-0.213824-0.2213520.346718-0.361737-0.3840100.5474430.5869790.539698-0.5468690.776906-0.496554-0.331571-0.335290-0.261194-0.256176
srv_count-0.2501390.722609-0.497683-0.010128-0.029117-0.004799-0.114735-0.018027-0.438947-0.091154-0.015174-0.008225-0.053530-0.034598-0.011784-0.0404970.0001000.000064-0.0627130.9505871.000000-0.428185-0.421424-0.281468-0.2840340.517227-0.511998-0.2390570.4426110.7207460.681955-0.6739160.812280-0.391712-0.449096-0.442823-0.313442-0.308132
serror_rate-0.074211-0.657460-0.2058480.014160-0.008849-0.001338-0.035487-0.003674-0.187114-0.030516-0.004952-0.002318-0.016031-0.009703-0.004343-0.0139450.0001670.000102-0.017343-0.303538-0.4281851.0000000.990888-0.091157-0.095285-0.8519150.828012-0.1214890.165350-0.724317-0.7457450.719708-0.650336-0.1535680.9739470.965663-0.103198-0.105434
srv_serror_rate-0.073663-0.652391-0.1987150.014342-0.023382-0.001327-0.034934-0.004027-0.180122-0.030264-0.004923-0.002295-0.015936-0.010390-0.004740-0.0135720.000209-0.000302-0.017240-0.308923-0.4214240.9908881.000000-0.110664-0.115286-0.8393150.815305-0.1122220.160322-0.713313-0.7343340.707753-0.646256-0.1480720.9672140.970617-0.122630-0.124656
rerror_rate-0.025936-0.342180-0.100958-0.0004510.000430-0.0007050.0134680.035324-0.0919620.008573-0.001104-0.001227-0.008610-0.005069-0.002541-0.0075810.000536-0.000550-0.008867-0.213824-0.281468-0.091157-0.1106641.0000000.978813-0.3279860.345571-0.017902-0.067857-0.330391-0.3031260.308722-0.2784650.073061-0.094076-0.1106460.9102250.911622
srv_rerror_rate-0.026420-0.332977-0.081307-0.001690-0.012676-0.0007260.0520030.034876-0.0722870.054006-0.001143-0.001253-0.008708-0.004775-0.0025720.0018740.0003460.000457-0.009193-0.221352-0.284034-0.095285-0.1152860.9788131.000000-0.3165680.3334390.011285-0.072595-0.323032-0.2943280.300186-0.2822390.075178-0.096146-0.1143410.9045910.914904
same_srv_rate0.0622910.7440460.2296770.0021530.0102180.0015210.0413420.0057160.2169690.0352530.0049460.0026340.0138810.0097840.0042820.0154990.000208-0.0001590.0180420.3467180.517227-0.851915-0.839315-0.327986-0.3165681.000000-0.9821090.140660-0.1901210.8487540.873551-0.8445370.7328410.179040-0.830067-0.819335-0.282487-0.282913
diff_srv_rate-0.050875-0.739988-0.222572-0.001846-0.009386-0.001522-0.040555-0.005538-0.214019-0.034953-0.004553-0.002649-0.011337-0.008711-0.003743-0.0151120.000328-0.000235-0.017000-0.361737-0.5119980.8280120.8153050.3455710.333439-0.9821091.000000-0.1382930.185942-0.844028-0.8685800.850911-0.727031-0.1769300.8072050.7958440.2990410.298904
srv_diff_host_rate0.123621-0.1040420.5210030.0206780.012117-0.0007880.032141-0.0030960.5038070.0364970.0022860.0003480.0063160.0144120.0010960.024266-0.000141-0.000360-0.008878-0.384010-0.239057-0.121489-0.112222-0.0179020.0112850.140660-0.1382931.000000-0.4450510.0350100.068648-0.050472-0.2227070.433173-0.097973-0.0926610.0225850.024722
dst_host_count-0.1611070.130377-0.611972-0.019923-0.029149-0.005894-0.074178-0.028369-0.682721-0.041615-0.021367-0.006697-0.078717-0.049529-0.021200-0.023865-0.000424-0.000106-0.0554530.5474430.4426110.1653500.160322-0.067857-0.072595-0.1901210.185942-0.4450511.0000000.022731-0.0704480.0443380.189876-0.9188940.1238810.113845-0.125142-0.125273
dst_host_srv_count-0.2171670.7419790.024124-0.012341-0.058225-0.005698-0.017960-0.0150920.0803520.003465-0.011906-0.006288-0.038689-0.026890-0.012017-0.023657-0.0002800.000206-0.0443660.5869790.720746-0.724317-0.713313-0.330391-0.3230320.848754-0.8440280.0350100.0227311.0000000.970072-0.9551780.7694810.043668-0.722607-0.708392-0.312040-0.300787
dst_host_same_srv_rate-0.2119790.7291510.0550330.002576-0.049560-0.0040780.0187830.0030040.1145260.0389800.000515-0.005738-0.038935-0.021731-0.009962-0.021358-0.0005030.000229-0.0417490.5396980.681955-0.745745-0.734334-0.303126-0.2943280.873551-0.8685800.068648-0.0704480.9700721.000000-0.9802450.7711580.107926-0.742045-0.725272-0.278068-0.264383
dst_host_diff_srv_rate0.231644-0.712965-0.035073-0.0018030.0555420.005208-0.017198-0.002960-0.093565-0.039091-0.0009160.0066870.0474140.0270920.0107610.026703-0.000181-0.0000040.044640-0.546869-0.6739160.7197080.7077530.3087220.300186-0.8445370.850911-0.0504720.044338-0.955178-0.9802451.000000-0.766402-0.0886650.7192750.7011490.2874760.271067
dst_host_same_src_port_rate-0.0652020.815039-0.3961950.004265-0.015449-0.001939-0.086998-0.006617-0.359506-0.078843-0.004617-0.005020-0.015968-0.015018-0.003521-0.033288-0.0004550.000283-0.0380920.7769060.812280-0.650336-0.646256-0.278465-0.2822390.732841-0.727031-0.2227070.1898760.7694810.771158-0.7664021.000000-0.175310-0.658737-0.652636-0.299273-0.297100
dst_host_srv_diff_host_rate0.100692-0.1402310.5785570.0161710.007306-0.000976-0.014141-0.0025880.659078-0.0209790.0086310.0010520.0610300.0305900.0158820.0117650.0002880.000538-0.012578-0.496554-0.391712-0.153568-0.1480720.0730610.0751780.179040-0.1769300.433173-0.9188940.0436680.107926-0.088665-0.1753101.000000-0.118697-0.1037150.1149710.120767
dst_host_serror_rate-0.056753-0.645920-0.1670470.0135660.010387-0.001381-0.0047060.014713-0.143283-0.005019-0.0034980.001974-0.008457-0.002257-0.001588-0.011197-0.000011-0.000076-0.001066-0.331571-0.4490960.9739470.967214-0.094076-0.096146-0.8300670.807205-0.0979730.123881-0.722607-0.7420450.719275-0.658737-0.1186971.0000000.968015-0.087531-0.096899
dst_host_srv_serror_rate-0.057298-0.641792-0.1583780.012265-0.024117-0.001370-0.0107210.014914-0.132474-0.004504-0.0030320.002893-0.007096-0.004295-0.002357-0.011487-0.000372-0.000007-0.016885-0.335290-0.4428230.9656630.970617-0.110646-0.114341-0.8193350.795844-0.0926610.113845-0.708392-0.7252720.701149-0.652636-0.1037150.9680151.000000-0.111578-0.110532
dst_host_rerror_rate-0.007759-0.297338-0.0030420.0003890.046656-0.0007860.1990190.0323950.0072360.2141150.0027630.003173-0.0004210.000626-0.000617-0.004743-0.000823-0.0004350.025282-0.261194-0.313442-0.103198-0.1226300.9102250.904591-0.2824870.2990410.022585-0.125142-0.312040-0.2780680.287476-0.2992730.114971-0.087531-0.1115781.0000000.950964
dst_host_srv_rerror_rate-0.013891-0.3005810.001621-0.001816-0.013666-0.0007820.1891420.0321510.0129790.2178580.0021510.001731-0.005012-0.001096-0.002020-0.004552-0.001038-0.000529-0.004292-0.256176-0.308132-0.105434-0.1246560.9116220.914904-0.2829130.2989040.024722-0.125273-0.300787-0.2643830.271067-0.2971000.120767-0.096899-0.1105320.9509641.000000


Out[24]:
src_bytesdst_byteshotlogged_innum_compromisednum_outbound_cmdsis_hot_logincountsrv_countserror_ratesrv_serror_ratererror_ratesrv_rerror_ratesame_srv_ratediff_srv_ratedst_host_countdst_host_srv_countdst_host_same_srv_ratedst_host_diff_srv_ratedst_host_same_src_port_ratedst_host_srv_diff_host_ratedst_host_serror_ratedst_host_srv_serror_ratedst_host_rerror_ratedst_host_srv_rerror_rate
src_bytesFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalse
dst_bytesFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalse
hotFalseFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalse
logged_inFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalse
num_compromisedFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalse
num_outbound_cmdsFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalse
is_hot_loginFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalse
countFalseFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalse
srv_countFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalse
serror_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseTrueTrueFalseFalseFalseFalseFalseFalseTrueTrueFalseFalse
srv_serror_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseTrueTrueFalseFalseFalseFalseFalseFalseTrueTrueFalseFalse
rerror_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrue
srv_rerror_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrue
same_srv_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrueFalseFalseFalseTrueFalseTrueTrueTrueFalseFalseTrueTrueFalseFalse
diff_srv_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrueFalseFalseTrueFalseFalseTrueTrueTrueFalseFalseTrueFalseFalseFalse
dst_host_countFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalse
dst_host_srv_countFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrueFalseFalseTrueTrueFalseFalseFalseFalseFalseFalse
dst_host_same_srv_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrueFalseTrueFalseTrueFalseFalseFalseFalseFalseFalse
dst_host_diff_srv_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrueFalseTrueTrueFalseFalseFalseFalseFalseFalseFalse
dst_host_same_src_port_rateTrueFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalse
dst_host_srv_diff_host_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseFalseFalse
dst_host_serror_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrueFalseFalseTrueTrueFalseFalseFalseFalseFalseFalseFalseTrueFalseFalse
dst_host_srv_serror_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrueFalseFalseTrueFalseFalseFalseFalseFalseFalseFalseTrueFalseFalseFalse
dst_host_rerror_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrue
dst_host_srv_rerror_rateFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueTrueFalseFalseFalseFalseFalseFalseFalseFalseFalseFalseTrueFalse

Conclusions and possible model selection hints

The previous dataframe showed us which variables are highly correlated. We have kept just those variables with at least one strong correlation. We can use as we please, but a good way could be to do some model selection. That is, if we have a group of variables that are highly correlated, we can keep just one of them to represent the group under the assumption that they convey similar information as predictors. Reducing the number of variables will not improve our model accuracy, but it will make it easier to understand and also more efficient to compute.
For example, from the description of the KDD Cup 99 task we know that the variable dst_host_same_src_port_rate references the percentage of the last 100 connections to the same port, for the same destination host. In our correlation matrix (and auxiliar dataframes) we find that this one is highly and positively correlated to src_bytes and srv_count. The former is the number of bytes sent form source to destination. The later is the number of connections to the same service as the current connection in the past 2 seconds. We might decide not to include dst_host_same_src_port_rate in our model if we include the other two, as a way to reduce the number of variables and later one better interpret our models.
Later on, in those notebooks dedicated to build predictive models, we will make use of this information to build more interpretable models.
MLlib: Logistic Regression
Labeled points and Logistic Regression classification of network attacks in MLlib. Application of model selection techniques using correlation matrix and Hypothesis Testing.

MLlib: Classification with Logistic Regression

In this notebook we will use Spark's machine learning library MLlib to build a Logistic Regression classifier for network attack detection. We will use the complete KDD Cup 1999 datasets in order to test Spark capabilities with large datasets.
Additionally, we will introduce two ways of performing model selection: by using a correlation matrix and by using hypothesis testing.

Creating the RDD

In [1]:
from pyspark import SparkContext
sc =SparkContext()
In [2]:
data_file = "/home/osboxes/Python with Spark - part 1/pydata/kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())
Train data size is 4898431
The KDD Cup 1999 also provide test data that we will load in a separate RDD.
In [3]:
ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")
In [3]:
test_data_file = "/home/osboxes/Python with Spark - part 1/pydata/corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())
Test data size is 311029

Labeled Points

A labeled point is a local vector associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms and they are stored as doubles. For binary classification, a label should be either 0 (negative) or 1 (positive).

Preparing the training data

In our case, we are interested in detecting network attacks in general. We don't need to detect which type of attack we are dealing with. Therefore we will tag each network interaction as non attack (i.e. 'normal' tag) or attack (i.e. anything else but 'normal').
In [4]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

def parse_interaction(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,41]
    clean_line_split = line_split[0:1]+line_split[4:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = raw_data.map(parse_interaction)

Preparing the test data

Similarly, we process our test data file.
In [5]:
test_data = test_raw_data.map(parse_interaction)

Detecting network attacks using Logistic Regression

Logistic regression is widely used to predict a binary response. Spark implements two algorithms to solve logistic regression: mini-batch gradient descent and L-BFGS. L-BFGS is recommended over mini-batch gradient descent for faster convergence.

Training a classifier

In [6]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time

# Build the model
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 519.229 seconds

Evaluating the model on new data

In order to measure the classification error on our test data, we use map on the test_data RDD and the model to predict each test point class.
In [7]:
labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))
Classification results are returned in pars, with the actual test label and the predicted one. This is used to calculate the classification error by using filterand count as follows.
In [8]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 20.933 seconds. Test accuracy is 0.9164
That's a decent accuracy. We know that there is space for improvement with a better variable selection and also by including categorical variables (e.g. we have excluded 'protocol' and 'service').

Model selection

Model or feature selection helps us building more interpretable and efficient models (or a classifier in this case). For illustrative purposes, we will follow two different approaches, correlation matrices and hypothesis testing.

Using a correlation matrix

In a previous notebook we calculated a correlation matrix in order to find predictors that are highly correlated. There are many possible choices there in order to simplify our model. We can pick different combinations of correlated variables and leave just those that represent them. The reader can try different combinations. Here we will choose the following for illustrative purposes:
  • From the description of the KDD Cup 99 task we know that the variable dst_host_same_src_port_rate references the percentage of the last 100 connections to the same port, for the same destination host. In our correlation matrix (and auxiliary dataframes) we find that this one is highly and positively correlated to src_bytes and srv_count. The former is the number of bytes sent form source to destination. The later is the number of connections to the same service as the current connection in the past 2 seconds. We decide not to include dst_host_same_src_port_rate in our model since we include the other two.
  • Variables serror_rate and srv_error_rate (% of connections that have SYN errors for same host and same service respectively) are highly positively correlated. Moreover, the set of variables that they highly correlate with are pretty much the same. They look like contributing very similarly to our model. We will keep just serror_rate.
  • A similar situation happens with rerror_rate and srv_rerror_rate (% of connections that have REJ errors) so we will keep just rerror_rate.
  • Same thing with variables prefixed with dst_host_ for the previous ones (e.g. dst_host_srv_serror_rate).
We will stop here, although the reader can keep experimenting removing correlated variables has before (e.g. same_srv_rate and diff_srv_rate are good candidates. Our list of variables we will drop includes:
  • dst_host_same_src_port_rate, (column 35).
  • srv_serror_rate (column 25).
  • srv_rerror_rate (column 27).
  • dst_host_srv_serror_rate (column 38).
  • dst_host_srv_rerror_rate (column 40).

Evaluating the new model

Let's proceed with the evaluation of our reduced model. First we need to provide training and testing datasets containing just the selected variables. For that we will define a new function to parse the raw data that keeps just what we need.
In [9]:
def parse_interaction_corr(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,25,27,35,38,40,41]
    clean_line_split = line_split[0:1]+line_split[4:25]+line_split[26:27]+line_split[28:35]+line_split[36:38]+line_split[39:40]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

corr_reduced_training_data = raw_data.map(parse_interaction_corr)
corr_reduced_test_data = test_raw_data.map(parse_interaction_corr)
Note: when selecting elements in the split, a list comprehension with a leave_out list for filtering is more Pythonic than slicing and concatenation indeed, but we have found it less efficient. This is very important when dealing with large datasets. The parse_interaction functions will be called for every element in the RDD, so we need to make them as efficient as possible.
Now we can train the model.
In [10]:
# Build the model
t0 = time()
logit_model_2 = LogisticRegressionWithLBFGS.train(corr_reduced_training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 595.322 seconds
And evaluate its accuracy on the test data.
In [11]:
labels_and_preds = corr_reduced_test_data.map(lambda p: (p.label, logit_model_2.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(corr_reduced_test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 20.872 seconds. Test accuracy is 0.8599
As expected, we have reduced accuracy and also training time. However this doesn't seem a good trade! At least not for logistic regression and considering the predictors we decided to leave out. We have lost quite a lot of accuracy and have not gained a lot of execution time during training. Moreover prediction time didn't improve.

Using hypothesis testing

Hypothesis testing is a powerful tool in statistical inference and learning to determine whether a result is statistically significant. MLlib supports Pearson's chi-squared (χ2) tests for goodness of fit and independence. The goodness of fit test requires an input type of Vector, whereas the independence test requires a Matrix as input. Moreover, MLlib also supports the input type RDD[LabeledPoint] to enable feature selection via chi-squared independence tests. Again, these methods are part of the Statistics package.
In our case we want to perform some sort of feature selection, so we will provide an RDD of LabeledPoint. Internally, MLlib will calculate a contingency matrix and perform the Persons's chi-squared (χ2) test. Features need to be categorical. Real-valued features will be treated as categorical in each of its different values. There is a limit of 1000 different values, so we need either to leave out some features or categorise them. In this case, we will consider just features that either take boolean values or just a few different numeric values in our dataset. We could overcome this limitation by defining a more complex parse_interaction function that categorises each feature properly.
In [12]:
feature_names = ["land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]
In [13]:
def parse_interaction_categorical(line):
    line_split = line.split(",")
    clean_line_split = line_split[6:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_categorical = raw_data.map(parse_interaction_categorical)
In [14]:
from pyspark.mllib.stat import Statistics

chi = Statistics.chiSqTest(training_data_categorical)
Now we can check the resulting values after putting them into a Pandas data frame.
In [15]:
import pandas as pd
pd.set_option('display.max_colwidth', 30)

records = [(result.statistic, result.pValue) for result in chi]

chi_df = pd.DataFrame(data=records, index= feature_names, columns=["Statistic","p-value"])

chi_df 
Out[15]:
Statisticp-value
land4.649835e-014.953041e-01
wrong_fragment3.068555e+020.000000e+00
urgent3.871844e+012.705761e-07
hot1.946331e+040.000000e+00
num_failed_logins1.277691e+020.000000e+00
logged_in3.273098e+060.000000e+00
num_compromised2.011863e+030.000000e+00
root_shell1.044918e+030.000000e+00
su_attempted4.340000e+020.000000e+00
num_root2.287168e+040.000000e+00
num_file_creations9.179739e+030.000000e+00
num_shells1.380028e+030.000000e+00
num_access_files1.873477e+040.000000e+00
num_outbound_cmds0.000000e+001.000000e+00
is_hot_login8.070987e+004.497960e-03
is_guest_login1.350051e+040.000000e+00
count4.546398e+060.000000e+00
srv_count2.296060e+060.000000e+00
serror_rate2.684199e+050.000000e+00
srv_serror_rate3.026270e+050.000000e+00
rerror_rate9.860453e+030.000000e+00
srv_rerror_rate3.247639e+040.000000e+00
same_srv_rate3.999124e+050.000000e+00
diff_srv_rate3.909998e+050.000000e+00
srv_diff_host_rate1.365459e+060.000000e+00
dst_host_count2.520479e+060.000000e+00
dst_host_srv_count1.439086e+060.000000e+00
dst_host_same_srv_rate1.237932e+060.000000e+00
dst_host_diff_srv_rate1.339002e+060.000000e+00
dst_host_same_src_port_rate2.915195e+060.000000e+00
dst_host_srv_diff_host_rate2.226291e+060.000000e+00
dst_host_serror_rate4.074546e+050.000000e+00
dst_host_srv_serror_rate4.550990e+050.000000e+00
dst_host_rerror_rate1.364790e+050.000000e+00
dst_host_srv_rerror_rate2.545474e+050.000000e+00
From that we conclude that predictors land and num_outbound_cmds could be removed from our model without affecting our accuracy dramatically. Let's try this.

Evaluating the new model

So the only modification to our first parse_interaction function will be to remove columns 6 and 19, corresponding to the two predictors that we want not to be part of our model.
In [16]:
def parse_interaction_chi(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,6,19,41]
    clean_line_split = line_split[0:1] + line_split[4:6] + line_split[7:19] + line_split[20:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_chi = raw_data.map(parse_interaction_chi)
test_data_chi = test_raw_data.map(parse_interaction_chi)
Now we build the logistic regression classifier again.
In [18]:
# Build the model
t0 = time()
logit_model_chi = LogisticRegressionWithLBFGS.train(training_data_chi)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 518.452 seconds
And evaluate in test data.
In [19]:
labels_and_preds = test_data_chi.map(lambda p: (p.label, logit_model_chi.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data_chi.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 21.726 seconds. Test accuracy is 0.9164
So we can see that, by using hypothesis testing, we have been able to remove two predictors without diminishing testing accuracy at all. Training time improved a bit as well. This might now seem like a big model reduction, but it is something when dealing with big data sources. Moreover, we should be able to categorise those five predictors we have left out for different reasons and, either include them in the model or leave them out if they aren't statistically significant.
Additionally, we could try to remove some of those predictors that are highly correlated, trying not to reduce accuracy too much. In the end, we should end up with a model easier to understand and use.

MLlib: Decision Trees

Use of tree-based methods and how they help explaining models and feature selection.

MLlib: Decision Trees

In this notebook we will use Spark's machine learning library MLlib to build a Decision Tree classifier for network attack detection. We will use the completeKDD Cup 1999 datasets in order to test Spark capabilities with large datasets.
Decision trees are a popular machine learning tool in part because they are easy to interpret, handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions. In this notebook, we will first train a classification tree including every single predictor. Then we will use our results to perform model selection. Once we find out the most important ones (the main splits in the tree) we will build a minimal tree using just three of them (the first two levels of the tree in order to compare performance and accuracy.

Creating the RDD

As we said, this time we will use the complete dataset provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.
In [2]:
from pyspark import SparkContext
sc =SparkContext()
In [3]:
import urllib
In [2]:
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")
In [4]:
data_file = "/home/osboxes/Python with Spark - part 1/pydata/kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())
Train data size is 4898431
The KDD Cup 1999 also provide test data that we will load in a separate RDD.
In [4]:
ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")
In [5]:
test_data_file = "/home/osboxes/Python with Spark - part 1/pydata/corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())
Test data size is 311029

Detecting network attacks using Decision Trees

In this section we will train a classification tree that, as we did with logistic regression, will predict if a network interaction is either normal or attack.
Training a classification tree using MLlib requires some parameters:
  • Training data
  • Num classes
  • Categorical features info: a map from column to categorical variables arity. This is optional, although it should increase model accuracy. However it requires that we know the levels in our categorical variables in advance. second we need to parse our data to convert labels to integer values within the arity range.
  • Impurity metric
  • Tree maximum depth
  • And tree maximum number of bins
In the next section we will see how to obtain all the labels within a dataset and convert them to numerical factors.

Preparing the data

As we said, in order to benefits from trees ability to seamlessly with categorical variables, we need to convert them to numerical factors. But first we need to obtain all the possible levels. We will use set transformations on a csv parsed RDD.
In [6]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

protocols = csv_data.map(lambda x: x[1]).distinct().collect()
services = csv_data.map(lambda x: x[2]).distinct().collect()
flags = csv_data.map(lambda x: x[3]).distinct().collect()
And now we can use this Python lists in our create_labeled_point function. If a factor level is not in the training data, we assign an especial level. Remember that we cannot use testing data for training our model, not even the factor levels. The testing data represents the unknown to us in a real case.
In [7]:
def create_labeled_point(line_split):
    # leave_out = [41]
    clean_line_split = line_split[0:41]
    
    # convert protocol to numeric categorical variable
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # convert service to numeric categorical variable
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

Training a classifier

We are now ready to train our classification tree. We will keep the maxDepth value small. This will lead to smaller accuracy, but we will obtain less splits so later on we can better interpret the tree. In a production system we will try to increase this value in order to find a better accuracy.
In [8]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from time import time

# Build the model
t0 = time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 302.6 seconds

Evaluating the model

In order to measure the classification error on our test data, we use map on the test_data RDD and the model to predict each test point class.
In [9]:
predictions = tree_model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)
Classification results are returned in pars, with the actual test label and the predicted one. This is used to calculate the classification error by using filterand count as follows.
In [10]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 32.387 seconds. Test accuracy is 0.918
NOTE: the zip transformation doesn't work properly with pySpark 1.2.1. It does in 1.3

Interpreting the model

Understanding our tree splits is a great exercise in order to explain our classification labels in terms of predictors and the values they take. Using the toDebugString method in our three model we can obtain a lot of information regarding splits, nodes, etc.
In [11]:
print "Learned classification tree model:"
print tree_model.toDebugString()
Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 29 nodes
  If (feature 22 <= 74.0)
   If (feature 25 <= 0.6)
    If (feature 36 <= 0.43)
     If (feature 34 <= 0.91)
      Predict: 0.0
     Else (feature 34 > 0.91)
      Predict: 1.0
    Else (feature 36 > 0.43)
     If (feature 2 in {0.0,3.0,15.0,26.0,36.0,67.0,27.0,18.0,4.0,7.0,20.0,24.0,43.0,44.0,46.0,47.0,55.0,57.0,58.0,60.0,42.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,3.0,15.0,26.0,36.0,67.0,27.0,18.0,4.0,7.0,20.0,24.0,43.0,44.0,46.0,47.0,55.0,57.0,58.0,60.0,42.0})
      Predict: 1.0
   Else (feature 25 > 0.6)
    If (feature 3 in {7.0,4.0,9.0,2.0,3.0,10.0})
     If (feature 2 in {3.0,5.0,7.0,8.0,15.0,18.0,50.0,51.0,67.0,12.0,27.0,42.0,58.0,68.0})
      Predict: 0.0
     Else (feature 2 not in {3.0,5.0,7.0,8.0,15.0,18.0,50.0,51.0,67.0,12.0,27.0,42.0,58.0,68.0})
      Predict: 1.0
    Else (feature 3 not in {7.0,4.0,9.0,2.0,3.0,10.0})
     If (feature 38 <= 0.06)
      Predict: 0.0
     Else (feature 38 > 0.06)
      Predict: 1.0
  Else (feature 22 > 74.0)
   If (feature 5 <= 0.0)
    If (feature 11 <= 0.0)
     If (feature 31 <= 254.0)
      Predict: 1.0
     Else (feature 31 > 254.0)
      Predict: 1.0
    Else (feature 11 > 0.0)
     If (feature 2 in {12.0})
      Predict: 0.0
     Else (feature 2 not in {12.0})
      Predict: 1.0
   Else (feature 5 > 0.0)
    If (feature 29 <= 0.08)
     If (feature 4 <= 28.0)
      Predict: 1.0
     Else (feature 4 > 28.0)
      Predict: 0.0
    Else (feature 29 > 0.08)
     Predict: 1.0

For example, a network interaction with the following features (see description here) will be classified as an attack by our model:
  • count, the number of connections to the same host as the current connection in the past two seconds, being greater than 32.
  • dst_bytes, the number of data bytes from destination to source, is 0.
  • service is neither level 0 nor 52.
  • logged_in is false.
    From our services list we know that:
In [12]:
print "Service 0 is {}".format(services[0])
print "Service 52 is {}".format(services[52])
Service 0 is urp_i
Service 52 is tftp_u
So we can characterise network interactions with more than 32 connections to the same server in the last 2 seconds, transferring zero bytes from destination to source, where service is neither urp_i nor tftp_u, and not logged in, as network attacks. A similar approach can be used for each tree terminal node.
We can see that count is the first node split in the tree. Remember that each partition is chosen greedily by selecting the best split from a set of possible splits, in order to maximize the information gain at a tree node (see more here). At a second level we find variables flag (normal or error status of the connection) and dst_bytes (the number of data bytes from destination to source) and so on.
This explaining capability of a classification (or regression) tree is one of its main benefits. Understaining data is a key factor to build better models.

Building a minimal model using the three main splits

So now that we know the main features predicting a network attack, thanks to our classification tree splits, let's use them to build a minimal classification tree with just the main three variables: countdst_bytes, and flag.
We need to define the appropriate function to create labeled points.
In [13]:
def create_labeled_point_minimal(line_split):
    # leave_out = [41]
    clean_line_split = line_split[3:4] + line_split[5:6] + line_split[22:23]
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[0] = flags.index(clean_line_split[0])
    except:
        clean_line_split[0] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_minimal = csv_data.map(create_labeled_point_minimal)
test_data_minimal = test_csv_data.map(create_labeled_point_minimal)
That we use to train the model.
In [14]:
# Build the model
t0 = time()
tree_model_minimal = DecisionTree.trainClassifier(training_data_minimal, numClasses=2, 
                                          categoricalFeaturesInfo={0: len(flags)},
                                          impurity='gini', maxDepth=3, maxBins=32)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))
Classifier trained in 171.338 seconds
Now we can predict on the testing data and calculate accuracy.
In [15]:
predictions_minimal = tree_model_minimal.predict(test_data_minimal.map(lambda p: p.features))
labels_and_preds_minimal = test_data_minimal.map(lambda p: p.label).zip(predictions_minimal)
In [16]:
t0 = time()
test_accuracy = labels_and_preds_minimal.filter(lambda (v, p): v == p).count() / float(test_data_minimal.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))
Prediction made in 17.446 seconds. Test accuracy is 0.9049
So we have trained a classification tree with just the three most important predictors, in half of the time, and with a not so bad accuracy. In fact, a classification tree is a very good model selection tool!

Spark SQL: structured processing for Data Analysis


In this notebook a schema is inferred for our network interactions dataset. Based on that, we use Spark's SQL DataFrameabstraction to perform a more structured exploratory data analysis.

Spark SQL and Data Frames

This notebook will introduce Spark capabilities to deal with data in a structured way. Basically, everything turns around the concept of Data Frame and usingSQL language to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerful when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.

Creating the RDD

As we did in previous notebooks, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million nework interactions. The file is provided as a Gzip file that we will download locally.
In [1]:
from pyspark import SparkContext
sc =SparkContext()
In [2]:
data_file = "/home/osboxes/Python with Spark - part 1/pydata/kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file).cache()

Getting a Data Frame

A Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case.
The entry point into all SQL functionality in Spark is the SQLContext class. To create a basic instance, all we need is a SparkContext reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object sc for this purpose.
In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Inferring the schema

With a SQLContext, we are ready to create a DataFrame from our existing RDD. But first we need to tell Spark SQL the schema in our data.
Spark SQL can convert an RDD of Row objects to a DataFrame. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.
In our case, we first need to split the comma separated data, and then use the information in KDD's 1999 task description to obtain the column names.
In [4]:
from pyspark.sql import Row

csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)
Once we have our RDD of Row we can infer and register the schema.
In [5]:
interactions_df = sqlContext.createDataFrame(row_data)
interactions_df.registerTempTable("interactions")
Now we can run SQL queries over our data frame that has been registered as a table.
In [6]:
# Select tcp network interactions with more than 1 second duration and no transfer from destination
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()
+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows

The results of SQL queries are RDDs and support all the normal RDD operations.
In [7]:
# Output duration together with dst_bytes
tcp_interactions_out = tcp_interactions.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
  print ti_out
Duration: 5057, Dest. bytes: 0
Duration: 5059, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5041, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5064, Dest. bytes: 0
Duration: 5043, Dest. bytes: 0
Duration: 5061, Dest. bytes: 0
Duration: 5049, Dest. bytes: 0
Duration: 5061, Dest. bytes: 0
Duration: 5048, Dest. bytes: 0
Duration: 5047, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5068, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5046, Dest. bytes: 0
Duration: 5052, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5054, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5058, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5032, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5040, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5066, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5036, Dest. bytes: 0
Duration: 5055, Dest. bytes: 0
Duration: 2426, Dest. bytes: 0
Duration: 5047, Dest. bytes: 0
Duration: 5057, Dest. bytes: 0
Duration: 5037, Dest. bytes: 0
Duration: 5057, Dest. bytes: 0
Duration: 5062, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5053, Dest. bytes: 0
Duration: 5064, Dest. bytes: 0
Duration: 5044, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5033, Dest. bytes: 0
Duration: 5066, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5042, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 5060, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5049, Dest. bytes: 0
Duration: 5043, Dest. bytes: 0
Duration: 5039, Dest. bytes: 0
Duration: 5041, Dest. bytes: 0
Duration: 42448, Dest. bytes: 0
Duration: 42088, Dest. bytes: 0
Duration: 41065, Dest. bytes: 0
Duration: 40929, Dest. bytes: 0
Duration: 40806, Dest. bytes: 0
Duration: 40682, Dest. bytes: 0
Duration: 40571, Dest. bytes: 0
Duration: 40448, Dest. bytes: 0
Duration: 40339, Dest. bytes: 0
Duration: 40232, Dest. bytes: 0
Duration: 40121, Dest. bytes: 0
Duration: 36783, Dest. bytes: 0
Duration: 36674, Dest. bytes: 0
Duration: 36570, Dest. bytes: 0
Duration: 36467, Dest. bytes: 0
Duration: 36323, Dest. bytes: 0
Duration: 36204, Dest. bytes: 0
Duration: 32038, Dest. bytes: 0
Duration: 31925, Dest. bytes: 0
Duration: 31809, Dest. bytes: 0
Duration: 31709, Dest. bytes: 0
Duration: 31601, Dest. bytes: 0
Duration: 31501, Dest. bytes: 0
Duration: 31401, Dest. bytes: 0
Duration: 31301, Dest. bytes: 0
Duration: 31194, Dest. bytes: 0
Duration: 31061, Dest. bytes: 0
Duration: 30935, Dest. bytes: 0
Duration: 30835, Dest. bytes: 0
Duration: 30735, Dest. bytes: 0
Duration: 30619, Dest. bytes: 0
Duration: 30518, Dest. bytes: 0
Duration: 30418, Dest. bytes: 0
Duration: 30317, Dest. bytes: 0
Duration: 30217, Dest. bytes: 0
Duration: 30077, Dest. bytes: 0
Duration: 25420, Dest. bytes: 0
Duration: 22921, Dest. bytes: 0
Duration: 22821, Dest. bytes: 0
Duration: 22721, Dest. bytes: 0
Duration: 22616, Dest. bytes: 0
Duration: 22516, Dest. bytes: 0
Duration: 22416, Dest. bytes: 0
Duration: 22316, Dest. bytes: 0
Duration: 22216, Dest. bytes: 0
Duration: 21987, Dest. bytes: 0
Duration: 21887, Dest. bytes: 0
Duration: 21767, Dest. bytes: 0
Duration: 21661, Dest. bytes: 0
Duration: 21561, Dest. bytes: 0
Duration: 21455, Dest. bytes: 0
Duration: 21334, Dest. bytes: 0
Duration: 21223, Dest. bytes: 0
Duration: 21123, Dest. bytes: 0
Duration: 20983, Dest. bytes: 0
Duration: 14682, Dest. bytes: 0
Duration: 14420, Dest. bytes: 0
Duration: 14319, Dest. bytes: 0
Duration: 14198, Dest. bytes: 0
Duration: 14098, Dest. bytes: 0
Duration: 13998, Dest. bytes: 0
Duration: 13898, Dest. bytes: 0
Duration: 13796, Dest. bytes: 0
Duration: 13678, Dest. bytes: 0
Duration: 13578, Dest. bytes: 0
Duration: 13448, Dest. bytes: 0
Duration: 13348, Dest. bytes: 0
Duration: 13241, Dest. bytes: 0
Duration: 13141, Dest. bytes: 0
Duration: 13033, Dest. bytes: 0
Duration: 12933, Dest. bytes: 0
Duration: 12833, Dest. bytes: 0
Duration: 12733, Dest. bytes: 0
Duration: 12001, Dest. bytes: 0
Duration: 5678, Dest. bytes: 0
Duration: 5010, Dest. bytes: 0
Duration: 1298, Dest. bytes: 0
Duration: 1031, Dest. bytes: 0
Duration: 36438, Dest. bytes: 0
We can easily have a look at our data frame schema using printSchema.
In [8]:
interactions_df.printSchema()
root
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)

Queries as DataFrame operations

Spark DataFrame provides a domain-specific language for structured data manipulation. This language includes methods we can concatenate in order to do selection, filtering, grouping, etc. For example, let's say we want to count how many interactions are there for each protocol type. We can proceed as follows.
In [9]:
from time import time

t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
+-------------+------+
|protocol_type| count|
+-------------+------+
|          udp| 20354|
|          tcp|190065|
|         icmp|283602|
+-------------+------+

Query performed in 18.452 seconds
Now imagine that we want to count how many interactions last more than 1 second, with no data transfer from destination, grouped by protocol type. We can just add to filter calls to the previous.
In [10]:
t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").filter(interactions_df.duration>1000).filter(interactions_df.dst_bytes==0).groupBy("protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
+-------------+-----+
|protocol_type|count|
+-------------+-----+
|          tcp|  139|
+-------------+-----+

Query performed in 14.254 seconds
We can use this to perform some exploratory data analysis. Let's count how many attack and normal interactions we have. First we need to add the label column to our data.
In [11]:
def get_label_type(label):
    if label!="normal.":
        return "attack"
    else:
        return "normal"
    
row_labeled_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    label=get_label_type(p[41])
    )
)
interactions_labeled_df = sqlContext.createDataFrame(row_labeled_data)
This time we don't need to register the schema since we are going to use the OO query interface.
Let's check the previous actually works by counting attack and normal data in our data frame.
In [12]:
t0 = time()
interactions_labeled_df.select("label").groupBy("label").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
+------+------+
| label| count|
+------+------+
|normal| 97278|
|attack|396743|
+------+------+

Query performed in 14.984 seconds
Now we want to count them by label and protocol type, in order to see how important the protocol type is to detect when an interaction is or not an attack.
In [13]:
t0 = time()
interactions_labeled_df.select("label", "protocol_type").groupBy("label", "protocol_type").count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
+------+-------------+------+
| label|protocol_type| count|
+------+-------------+------+
|attack|         icmp|282314|
|attack|          udp|  1177|
|attack|          tcp|113252|
|normal|         icmp|  1288|
|normal|          udp| 19177|
|normal|          tcp| 76813|
+------+-------------+------+

Query performed in 14.903 seconds
At first sight it seems that udp interactions are in lower proportion between network attacks versus other protocol types.
And we can do much more sophisticated groupings. For example, add to the previous a "split" based on data transfer from target.
In [14]:
t0 = time()
interactions_labeled_df.select("label", "protocol_type", "dst_bytes").groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0).count().show()
tt = time() - t0

print "Query performed in {} seconds".format(round(tt,3))
+------+-------------+---------------+------+
| label|protocol_type|(dst_bytes = 0)| count|
+------+-------------+---------------+------+
|normal|         icmp|           true|  1288|
|attack|          udp|           true|  1166|
|attack|          udp|          false|    11|
|normal|          tcp|           true|  9313|
|normal|          tcp|          false| 67500|
|attack|          tcp|           true|110583|
|attack|          tcp|          false|  2669|
|normal|          udp|           true|  3594|
|normal|          udp|          false| 15583|
|attack|         icmp|           true|282314|
+------+-------------+---------------+------+

Query performed in 14.469 seconds
We see how relevant is this new split to determine if a network interaction is an attack.
We will stop here, but we can see how powerful this type of queries are in order to explore our data. Actually we can replicate all the splits we saw in previous notebooks, when introducing classification trees, just by selecting, groping, and filtering our dataframe. For a more detailed (but less real-world) list of Spark's DataFrame operations and data sources, have a look at the official documentation here.




Newer Posts Older Posts Home