tnt-ooo-tnt

Java-Cesco/Detecting_fraud_clicks#13-1

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import java.util.ArrayList;
import java.util.List;
import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.when;
public class Aggregation {
public static void main(String[] args) throws Exception {
//Create Session
SparkSession spark = SparkSession
.builder()
.appName("Detecting Fraud Clicks")
.master("local")
.getOrCreate();
// Aggregation
Aggregation agg = new Aggregation();
Dataset<Row> dataset = agg.loadCSVDataSet("/home/chris/.kaggle/competitions/talkingdata-adtracking-fraud-detection/mnt/ssd/kaggle-talkingdata2/competition_files/train_sample.csv", spark);
dataset = agg.changeTimestempToLong(dataset);
dataset = agg.averageValidClickCount(dataset);
dataset = agg.clickTimeDelta(dataset);
dataset = agg.countClickInTenMinutes(dataset);
long start = System.currentTimeMillis();
List<String> logs_with_features = dataset.map(row->row.toString(), Encoders.STRING()).collectAsList();
String[][] contents = new String[(int)dataset.count()][11];
for (int i =0; i<logs_with_features.size();i++){
String str_to_split = logs_with_features.get(i);
String[] tmp = str_to_split.substring(1,str_to_split.length()-1).split(",");
contents[i] = tmp;
}
long end = System.currentTimeMillis();
System.out.println("JK's Procedure time elapsed : " + (end-start)/1000.0);
start = System.currentTimeMillis();
List<String> stringDataset = dataset.toJSON().collectAsList();
end = System.currentTimeMillis();
System.out.println("Steve's Procedure 1 time elapsed : " + (end-start)/1000.0);
new GUI(stringDataset, contents);
}
private Dataset<Row> loadCSVDataSet(String path, SparkSession spark){
// Read SCV to DataSet
return spark.read().format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path);
}
private Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){
// cast timestamp to long
Dataset<Row> newDF = dataset.withColumn("utc_click_time", dataset.col("click_time").cast("long"));
newDF = newDF.withColumn("utc_attributed_time", dataset.col("attributed_time").cast("long"));
newDF = newDF.drop("click_time").drop("attributed_time");
return newDF;
}
private Dataset<Row> averageValidClickCount(Dataset<Row> dataset){
// set Window partition by 'ip' and 'app' order by 'utc_click_time' select rows between 1st row to current row
WindowSpec w = Window.partitionBy("ip", "app")
.orderBy("utc_click_time")
.rowsBetween(Window.unboundedPreceding(), Window.currentRow());
// aggregation
Dataset<Row> newDF = dataset.withColumn("cum_count_click", count("utc_click_time").over(w));
newDF = newDF.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
newDF = newDF.withColumn("avg_valid_click_count", col("cum_sum_attributed").divide(col("cum_count_click")));
newDF = newDF.drop("cum_count_click", "cum_sum_attributed");
return newDF;
}
private Dataset<Row> clickTimeDelta(Dataset<Row> dataset){
WindowSpec w = Window.partitionBy ("ip")
.orderBy("utc_click_time");
Dataset<Row> newDF = dataset.withColumn("lag(utc_click_time)", lag("utc_click_time",1).over(w));
newDF = newDF.withColumn("click_time_delta", when(col("lag(utc_click_time)").isNull(),
lit(0)).otherwise(col("utc_click_time")).minus(when(col("lag(utc_click_time)").isNull(),
lit(0)).otherwise(col("lag(utc_click_time)"))));
newDF = newDF.drop("lag(utc_click_time)");
return newDF;
}
private Dataset<Row> countClickInTenMinutes(Dataset<Row> dataset){
WindowSpec w = Window.partitionBy("ip")
.orderBy("utc_click_time")
.rangeBetween(Window.currentRow(),Window.currentRow()+600);
Dataset<Row> newDF = dataset.withColumn("count_click_in_ten_mins",
(count("utc_click_time").over(w)).minus(1)); //TODO 본인것 포함할 것인지 정해야함.
return newDF;
}
}
\ No newline at end of file
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import javax.swing.*;
import java.awt.*;
import java.io.BufferedReader;
import java.io.StringReader;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.List;
import java.util.Vector;
import java.awt.BorderLayout;
import java.awt.GridLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import java.util.Vector;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JLabel;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTable;
import javax.swing.JTextField;
import javax.swing.table.AbstractTableModel;
import javax.swing.table.DefaultTableModel;
public class GUI extends JFrame {
JTabbedPane tab = new JTabbedPane();
public GUI(List<String> q, String[][] data) {
super("CESCO");
tab.addTab("png", new PngPane());
tab.addTab("gif", new GifPane());
tab.addTab("jpg", new JpgPane());
tab.addTab("table", new createTable(q));
tab.addTab("processed_features", new createTable_alter(data));
add(tab);
setSize(800, 500); // 윈도우의 크기 가로x세로
setVisible(true); // 창을 보여줄떄 true, 숨길때 false
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); // x 버튼을 눌렀을때 종료
}
// public static void main(String args[]) {
// new GUI();
// }
}
class PngPane extends JPanel {
public PngPane() {
super();
ImageIcon image = new ImageIcon("data/model.png");
JLabel label = new JLabel("", image, JLabel.CENTER);
setLayout(new BorderLayout());
add(label, BorderLayout.CENTER);
}
}
class GifPane extends JPanel {
public GifPane() {
super();
ImageIcon image = new ImageIcon("data/model.gif");
JLabel label = new JLabel("", image, JLabel.CENTER);
setLayout(new BorderLayout());
add(label, BorderLayout.CENTER);
}
}
class JpgPane extends JPanel {
public JpgPane() {
super();
ImageIcon image = new ImageIcon("data/model.jpg");
JLabel label = new JLabel("", image, JLabel.CENTER);
setLayout(new BorderLayout());
add(label, BorderLayout.CENTER);
}
}
class createTable_alter extends JPanel{
private String[] header = {"ip","app","device","os","channel","is_attributed","click_time","attributed_time",
"avg_valid_click_count","click_time_delta","count_click_in_tenmin"};
/*
root
|-- ip: integer (nullable = true)
|-- app: integer (nullable = true)
|-- device: integer (nullable = true)
|-- os: integer (nullable = true)
|-- channel: integer (nullable = true)
|-- is_attributed: integer (nullable = true)
|-- utc_click_time: long (nullable = true)
|-- utc_attributed_time: long (nullable = true)
|-- avg_valid_click_count: double (nullable = true)
|-- click_time_delta: long (nullable = true)
|-- count_click_in_ten_mins: long (nullable = false)
*/
public createTable_alter(String[][] data){
JTable processed_table = new JTable(data, header);
JScrollPane jScrollPane = new JScrollPane(processed_table);
add(jScrollPane);
}
}
class createTable extends JPanel {
long start = System.currentTimeMillis();
public createTable(List<String> data) { //constructor : display table
getTableModel(data);
}
private DefaultTableModel getTableModel(List<String> data) {
String column_n[]={"ip","app","device","os","channel","is_attributed","click_time",
"avg_valid_click_count","click_time_delta","count_click_in_tenmin"};
Object tabledata[][]={};
DefaultTableModel model = new DefaultTableModel(tabledata,column_n);
JTable jtable = new JTable(model);
JScrollPane jScollPane = new JScrollPane(jtable);
add(jScollPane);
try {
for(int i =0; i<data.size();i++){
BufferedReader reader = getFileReader(data.get(i));
String line = reader.readLine();
line = line.replace("\"", "");
line = line.replace("_", "");
//line = line.replace("\\{","");
line = line.replaceAll("\\{|\\}","");
line = line.replaceAll("\\w+:", "");
//System.out.println(line);
Object [] temp= line.split(",");
model.addRow(temp);
reader.close();
}
} catch (Exception e) {
System.out.println(e);
}
long end = System.currentTimeMillis();
System.out.println("Steve's Procedure2 time elapsed : " + (end-start)/1000.0);
return model;
}
private BufferedReader getFileReader(String data) {
BufferedReader reader = new BufferedReader(new StringReader(data));
// In your real application the data would come from a file
//Reader reader = new BufferedReader( new FileReader(...) );
return reader;
}
}
\ No newline at end of file
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import javax.xml.crypto.Data;
import static org.apache.spark.sql.functions.*;
public class calForwardTimeDelta {
static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco");
static JavaSparkContext sc = new JavaSparkContext(conf);
public static void main(String[] args) throws Exception{
//Create Session
SparkSession spark = SparkSession
.builder()
.appName("Detecting Fraud Clicks")
.getOrCreate();
//run methods here
calcDelta(spark);
}
private static void calcDelta(SparkSession spark){
// put the path the file you gonna deal with being placed
String filepath = "/home/chris/.kaggle/competitions/talkingdata-adtracking-fraud-detection/mnt/ssd/kaggle-talkingdata2/competition_files/train_sample.csv";
// create Dataset from files
Dataset<Row> logDF = spark.read()
.format("csv")
.option("inferSchema", "true")
.option("header","true")
.load(filepath);
// cast timestamp(click_time, attributed_time) type to long type
//add column for long(click_time)
Dataset<Row> newDF = logDF.withColumn("utc_click_time", logDF.col("click_time").cast("long"));
//add column for long(attributed_time)
newDF = newDF.withColumn("utc_attributed_time", logDF.col("attributed_time").cast("long"));
//drop timestamp type columns
newDF = newDF.drop("click_time").drop("attributed_time");
newDF.createOrReplaceTempView("logs");
WindowSpec w = Window.partitionBy ("ip")
.orderBy("utc_click_time");
newDF = newDF.withColumn("lag(utc_click_time)", lag("utc_click_time",1).over(w));
newDF.where("ip=10").show();
newDF = newDF.withColumn("delta", when(col("lag(utc_click_time)").isNull(),lit(0)).otherwise(col("utc_click_time")).minus(when(col("lag(utc_click_time)").isNull(),lit(0)).otherwise(col("lag(utc_click_time)"))));
//newDF = newDF.withColumn("delta", datediff());
newDF = newDF.drop("lag(utc_click_time)");
newDF = newDF.orderBy("ip");
newDF.show();
}
}