KimchiSoup(junu)

demo code

Showing 25 changed files with 110 additions and 346 deletions
No preview for this file type
No preview for this file type
{"class":"org.apache.spark.ml.PipelineModel","timestamp":1528805498147,"sparkVersion":"2.3.0","uid":"pipeline_70a068225fba","paramMap":{"stageUids":["vecIdx_c20b02d06e4a","dtr_20be5d6af4d6"]}}
{"class":"org.apache.spark.ml.feature.VectorIndexerModel","timestamp":1528805498480,"sparkVersion":"2.3.0","uid":"vecIdx_c20b02d06e4a","paramMap":{"handleInvalid":"error","maxCategories":2,"outputCol":"indexedFeatures","inputCol":"features"}}
{"class":"org.apache.spark.ml.regression.DecisionTreeRegressionModel","timestamp":1528805500043,"sparkVersion":"2.3.0","uid":"dtr_20be5d6af4d6","paramMap":{"seed":926680331,"featuresCol":"indexedFeatures","checkpointInterval":10,"maxMemoryInMB":256,"minInfoGain":0.0,"cacheNodeIds":false,"maxDepth":10,"impurity":"variance","maxBins":32,"labelCol":"is_attributed","predictionCol":"prediction","minInstancesPerNode":1},"numFeatures":9}
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
......@@ -5,6 +12,7 @@ import javax.swing.*;
import java.awt.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
......@@ -61,7 +69,9 @@ class PngPane extends JPanel {
add(label, BorderLayout.CENTER);
}
}
class SharedArea{
Dataset<Row> data;
}
class CreateTable_tab extends JPanel{
public JPanel centre_pane = new JPanel();
public JPanel south_pane = new JPanel();
......@@ -82,7 +92,7 @@ class CreateTable_tab extends JPanel{
private DefaultTableModel tableModel3 = new DefaultTableModel(new Object[]{"unknown"},1);
public CsvFile_chooser temp = new CsvFile_chooser();
private String current_state="100";
public CreateTable_tab(){
super();
......@@ -103,13 +113,16 @@ class CreateTable_tab extends JPanel{
// sub Panel 3
pan3.setViewportView(table3);
centre_pane.add(pan3);
add(centre_pane, BorderLayout.CENTER);
//sub Panel 4
south_pane.setLayout(new FlowLayout());
south_pane.add(btn1);
btn1.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
if(temp.is_selected) {
String path = temp.selected_file.getAbsolutePath();
// 1st Column Raw Data
......@@ -126,10 +139,12 @@ class CreateTable_tab extends JPanel{
TableCreator table_maker = new TableCreator();
Dataset<Row> dataset = agg.loadCSVDataSet(path, spark);
if(current_state.equals("100")){
List<String> stringDataset_Raw = dataset.toJSON().collectAsList();
String[] header_r = {"ip", "app", "device", "os", "channel", "click_time", "is_attributed"};
table1.setModel(table_maker.getTableModel(stringDataset_Raw, header_r));
current_state="200";
}else if(current_state.equals("200")){
// 2nd Column Data with features
// Adding features
dataset = agg.changeTimestempToLong(dataset);
......@@ -140,6 +155,89 @@ class CreateTable_tab extends JPanel{
String[] header_f = {"ip", "app", "device", "os", "channel", "is_attributed", "click_time",
"avg_valid_click_count", "click_time_delta", "count_click_in_ten_mins"};
table2.setModel(table_maker.getTableModel(stringDataset_feat, header_f));
current_state="300";
}else if(current_state.equals("300")){
dataset = agg.changeTimestempToLong(dataset);
dataset = agg.averageValidClickCount(dataset);
dataset = agg.clickTimeDelta(dataset);
dataset = agg.countClickInTenMinutes(dataset);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{
"ip",
"app",
"device",
"os",
"channel",
"utc_click_time",
"avg_valid_click_count",
"click_time_delta",
"count_click_in_ten_mins"
})
.setOutputCol("features");
Dataset<Row> output = assembler.transform(dataset);
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(2)
.fit(output);
// Split the result into training and test sets (30% held out for testing).
// Dataset<Row>[] splits = output.randomSplit(new double[]{0.7, 0.3});
// Dataset<Row> trainingData = splits[0];
// Dataset<Row> testData = splits[1];
// Train a detact.DecisionTreeionTree model.
DecisionTreeRegressor dt = new DecisionTreeRegressor()
.setFeaturesCol("indexedFeatures")
.setLabelCol("is_attributed")
.setMaxDepth(10);
// Chain indexer and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[]{featureIndexer, dt});
// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(output);
// save model
try {
model.save("./decisionTree");
} catch (IOException e1) {
e1.printStackTrace();
}
PipelineModel p_model = PipelineModel.load("./decisionTree");
// Make predictions.
Dataset<Row> predictions = p_model.transform(assembler.transform(dataset));
predictions = predictions.drop("app")
.drop("device")
.drop("os")
.drop("channel")
.drop("utc_click_time")
.drop("utc_attributed_time")
.drop("avg_valid_click_count")
.drop("click_time_delta")
.drop("count_click_in_ten_mins")
.drop("features")
.drop("indexedFeatures");
predictions.printSchema();
List<String> stringDataset_feat = predictions.toJSON().collectAsList();
String[] header_f = {"ip","is_attributed","prediction"};
table3.setModel(table_maker.getTableModel(stringDataset_feat, header_f));
//
//
//
current_state="400";
}
// 3nd Column Final results
......@@ -148,7 +246,7 @@ class CreateTable_tab extends JPanel{
}
}
});
add(centre_pane, BorderLayout.CENTER);
add(south_pane, BorderLayout.SOUTH);
......@@ -181,7 +279,7 @@ class CsvFile_chooser extends JPanel{
add(path_field);
add(browser);
browser.addActionListener(new ActionListener(){
@Override
public void actionPerformed(ActionEvent e) {
Object obj = e.getSource();
if((JButton)obj == browser){
......@@ -298,3 +396,5 @@ class Aggregation {
return newDF;
}
}
......
package detact;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import static org.apache.spark.sql.functions.*;
public class Aggregation {
public static void main(String[] args) {
if (args.length != 2) {
System.out.println("Usage: java -jar aggregation.jar <data_path> <result_path>");
System.exit(0);
}
String data_path = args[0];
String result_path = args[1];
//Create Session
SparkSession spark = SparkSession
.builder()
.appName("Detecting Fraud Clicks")
.master("local")
.getOrCreate();
// detact.Aggregation
Aggregation agg = new Aggregation();
Dataset<Row> dataset = Utill.loadCSVDataSet(data_path, spark);
dataset = agg.changeTimestempToLong(dataset);
dataset = agg.averageValidClickCount(dataset);
dataset = agg.clickTimeDelta(dataset);
dataset = agg.countClickInTenMinutes(dataset);
// test
// dataset.where("ip == '5348' and app == '19'").show(10);
// Save to scv
Utill.saveCSVDataSet(dataset, result_path);
}
public Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){
// cast timestamp to long
Dataset<Row> newDF = dataset.withColumn("utc_click_time", dataset.col("click_time").cast("long"));
newDF = newDF.withColumn("utc_attributed_time", dataset.col("attributed_time").cast("long"));
newDF = newDF.drop("click_time").drop("attributed_time");
return newDF;
}
public Dataset<Row> averageValidClickCount(Dataset<Row> dataset){
// set Window partition by 'ip' and 'app' order by 'utc_click_time' select rows between 1st row to current row
WindowSpec w = Window.partitionBy("ip", "app")
.orderBy("utc_click_time")
.rowsBetween(Window.unboundedPreceding(), Window.currentRow());
// aggregation
Dataset<Row> newDF = dataset.withColumn("cum_count_click", count("utc_click_time").over(w));
newDF = newDF.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
newDF = newDF.withColumn("avg_valid_click_count", col("cum_sum_attributed").divide(col("cum_count_click")));
newDF = newDF.drop("cum_count_click", "cum_sum_attributed");
return newDF;
}
public Dataset<Row> clickTimeDelta(Dataset<Row> dataset){
WindowSpec w = Window.partitionBy ("ip")
.orderBy("utc_click_time");
Dataset<Row> newDF = dataset.withColumn("lag(utc_click_time)", lag("utc_click_time",1).over(w));
newDF = newDF.withColumn("click_time_delta", when(col("lag(utc_click_time)").isNull(),
lit(0)).otherwise(col("utc_click_time")).minus(when(col("lag(utc_click_time)").isNull(),
lit(0)).otherwise(col("lag(utc_click_time)"))));
newDF = newDF.drop("lag(utc_click_time)");
return newDF;
}
public Dataset<Row> countClickInTenMinutes(Dataset<Row> dataset){
WindowSpec w = Window.partitionBy("ip")
.orderBy("utc_click_time")
.rangeBetween(Window.currentRow(),Window.currentRow()+600);
Dataset<Row> newDF = dataset.withColumn("count_click_in_ten_mins",
(count("utc_click_time").over(w)).minus(1));
return newDF;
}
}
package detact.ML;
import detact.Aggregation;
import detact.Utill;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// DecisionTree Model
public class DecisionTree {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Usage: java -jar decisionTree.jar <agg_path>");
System.exit(0);
}
String agg_path = args[0];
//Create Session
SparkSession spark = SparkSession
.builder()
.appName("Detecting Fraud Clicks")
.master("local")
.getOrCreate();
// load aggregated dataset
Dataset<Row> resultds = Utill.loadCSVDataSet(agg_path, spark);
// show Dataset schema
// System.out.println("schema start");
// resultds.printSchema();
// String[] cols = resultds.columns();
// for (String col : cols) {
// System.out.println(col);
// }
// System.out.println("schema end");
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{
"ip",
"app",
"device",
"os",
"channel",
"utc_click_time",
"avg_valid_click_count",
"click_time_delta",
"count_click_in_ten_mins"
})
.setOutputCol("features");
Dataset<Row> output = assembler.transform(resultds);
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(2)
.fit(output);
// Split the result into training and test sets (30% held out for testing).
Dataset<Row>[] splits = output.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a detact.DecisionTreeionTree model.
DecisionTreeRegressor dt = new DecisionTreeRegressor()
.setFeaturesCol("indexedFeatures")
.setLabelCol("is_attributed")
.setMaxDepth(10);
// Chain indexer and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[]{featureIndexer, dt});
// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("is_attributed", "features").show(5);
// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("is_attributed")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test result = " + rmse);
DecisionTreeRegressionModel treeModel =
(DecisionTreeRegressionModel) (model.stages()[1]);
System.out.println("Learned regression tree model:\n" + treeModel.toDebugString());
// save model
model.save("./decisionTree");
// load model
PipelineModel load_mode = PipelineModel.load("./decisionTree");
// Make predictions.
Dataset<Row> load_pred = model.transform(testData);
}
}
package detact;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args) throws Exception{
if (args.length != 1) {
System.out.println("Usage: java -jar aggregation.jar <data_path>");
System.exit(0);
}
String data_path = args[0];
//Create Session
SparkSession spark = SparkSession
.builder()
.appName("Detecting Fraud Clicks")
.master("local")
.getOrCreate();
// detact.Aggregation
Aggregation agg = new Aggregation();
Dataset<Row> dataset = Utill.loadCSVDataSet(data_path, spark);
dataset = agg.changeTimestempToLong(dataset);
dataset = agg.averageValidClickCount(dataset);
dataset = agg.clickTimeDelta(dataset);
dataset = agg.countClickInTenMinutes(dataset);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{
"ip",
"app",
"device",
"os",
"channel",
"utc_click_time",
"avg_valid_click_count",
"click_time_delta",
"count_click_in_ten_mins"
})
.setOutputCol("features");
Dataset<Row> output = assembler.transform(dataset);
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(2)
.fit(output);
// Split the result into training and test sets (30% held out for testing).
Dataset<Row>[] splits = output.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a detact.DecisionTreeionTree model.
DecisionTreeRegressor dt = new DecisionTreeRegressor()
.setFeaturesCol("indexedFeatures")
.setLabelCol("is_attributed")
.setMaxDepth(10);
// Chain indexer and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[]{featureIndexer, dt});
// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);
// save model
model.save("./decisionTree");
PipelineModel p_model = PipelineModel.load("./decisionTree");
// Make predictions.
Dataset<Row> predictions = p_model.transform(testData);
// Select example rows to display.
predictions.select("is_attributed", "features").show(5);
// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("is_attributed")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test result = " + rmse);
DecisionTreeRegressionModel treeModel =
(DecisionTreeRegressionModel) (p_model.stages()[1]);
System.out.println("Learned regression tree model:\n" + treeModel.toDebugString());
}
}
package detact;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Utill {
public static Dataset<Row> loadCSVDataSet(String path, SparkSession spark){
// Read SCV to DataSet
return spark.read().format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path);
}
public static void saveCSVDataSet(Dataset<Row> dataset, String path){
// Read SCV to DataSet
dataset.write().format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.save(path);
}
}