신은섭(Shin Eun Seop)

Merge branch 'feature/#3' into feature/tenMinsHG

File mode changed
This diff is collapsed. Click to expand it.
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="MarkdownExportedFiles">
<htmlFiles />
<imageFiles />
<otherFiles />
</component>
</project>
\ No newline at end of file
......@@ -8,7 +8,17 @@
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="false" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file:///tmp" />
</component>
</project>
\ No newline at end of file
......
File mode changed
File mode changed
......@@ -31,7 +31,20 @@
<artifactId>spark-csv_2.11</artifactId>
<version>1.5.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
......
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import static org.apache.spark.sql.functions.*;
import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.when;
public class Aggregation {
public static void main(String[] args) throws Exception {
//Create Session
SparkSession spark = SparkSession
.builder()
.appName("Detecting Fraud Clicks")
.master("local")
.getOrCreate();
Aggregation agg = new Aggregation();
Dataset<Row> dataset = agg.loadCSVDataSet("./train_sample.csv", spark);
dataset = agg.changeTimestempToLong(dataset);
dataset = agg.averageValidClickCount(dataset);
dataset = agg.clickTimeDelta(dataset);
dataset.where("ip == '5348' and app == '19'").show();
}
private Dataset<Row> loadCSVDataSet(String path, SparkSession spark){
// Read SCV to DataSet
Dataset<Row> dataset = spark.read().format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("train_sample.csv");
return dataset;
}
private Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){
// cast timestamp to long
Dataset<Row> newDF = dataset.withColumn("utc_click_time", dataset.col("click_time").cast("long"));
newDF = newDF.withColumn("utc_attributed_time", dataset.col("attributed_time").cast("long"));
newDF = newDF.drop("click_time").drop("attributed_time");
return newDF;
}
private Dataset<Row> averageValidClickCount(Dataset<Row> dataset){
// set Window partition by 'ip' and 'app' order by 'utc_click_time' select rows between 1st row to current row
WindowSpec w = Window.partitionBy("ip", "app")
.orderBy("utc_click_time")
.rowsBetween(Window.unboundedPreceding(), Window.currentRow());
// aggregation
Dataset<Row> newDF = dataset.withColumn("cum_count_click", count("utc_click_time").over(w));
newDF = newDF.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
newDF = newDF.withColumn("avg_valid_click_count", col("cum_sum_attributed").divide(col("cum_count_click")));
newDF = newDF.drop("cum_count_click", "cum_sum_attributed");
return newDF;
}
private Dataset<Row> clickTimeDelta(Dataset<Row> dataset){
WindowSpec w = Window.partitionBy ("ip")
.orderBy("utc_click_time");
Dataset<Row> newDF = dataset.withColumn("lag(utc_click_time)", lag("utc_click_time",1).over(w));
newDF = newDF.withColumn("click_time_delta", when(col("lag(utc_click_time)").isNull(),
lit(0)).otherwise(col("utc_click_time")).minus(when(col("lag(utc_click_time)").isNull(),
lit(0)).otherwise(col("lag(utc_click_time)"))));
newDF = newDF.drop("lag(utc_click_time)");
return newDF;
}
}
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;
public class AvgAdvTime {
public static void main(String[] args) throws Exception {
// Start Spark Session
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Java Spark SQL basic example")
.getOrCreate();
// Read SCV to DataSet
Dataset<Row> df = spark.read().format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("train_sample.csv");
// cast timestamp to long
Dataset<Row> newdf = df.withColumn("utc_click_time", df.col("click_time").cast("long"));
newdf = newdf.withColumn("utc_attributed_time", df.col("attributed_time").cast("long"));
newdf = newdf.drop("click_time").drop("attributed_time");
// set Window partition by 'ip' and 'app' order by 'utc_click_time' select rows between 1st row to current row
WindowSpec w = Window.partitionBy("ip", "app")
.orderBy("utc_click_time")
.rowsBetween(Window.unboundedPreceding(), Window.currentRow());
// aggregation
newdf = newdf.withColumn("cum_count_click", count("utc_click_time").over(w));
newdf = newdf.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
newdf = newdf.withColumn("avg_efficient", col("cum_sum_attributed").divide(col("cum_count_click")));
// print example
newdf.where("ip == '5348' and app == '19'").show();
newdf.printSchema();
}
}
\ No newline at end of file
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
/**
* Calendar 객체 관련 기능들을 모아놓은 유틸리티 클래스
*
* @author croute
* @since 2011.02.10
*/
public class DateUtil
{
/**
* 캘린더 객체를 yyyy-MM-dd HH:mm:ss 형태의 문자열로 변환합니다.
*
* @param cal 캘린더 객체
* @return 변환된 문자열
*/
public static String StringFromCalendar(Calendar cal)
{
// 날짜를 통신용 문자열로 변경
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return formatter.format(cal.getTime());
}
/**
* 캘린더 객체를 yyyy-MM-dd형태의 문자열로 변환합니다.
*
* @param cal 캘린더 객체
* @return 변환된 문자열
*/
public static String StringSimpleFromCalendar(Calendar cal)
{
// 날짜를 통신용 문자열로 변경
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
return formatter.format(cal.getTime());
}
/**
* yyyy-MM-dd HH:mm:ss 형태의 문자열을 캘린더 객체로 변환합니다.
* 만약 변환에 실패할 경우 오늘 날짜를 반환합니다.
*
* @param date 날짜를 나타내는 문자열
* @return 변환된 캘린더 객체
*/
public static Calendar CalendarFromString(String date)
{
if (date.length() == 0)
return null;
Calendar cal = Calendar.getInstance();
try
{
//String oldstring = "2011-01-18 00:00:00.0";
// Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").parse(oldstring);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
cal.setTime(formatter.parse(date));
}
catch(ParseException e)
{
e.printStackTrace();
}
return cal;
}
/**
* yyyy-MM-dd 형태의 문자열을 캘린더 객체로 변환합니다.
* 만약 변환에 실패할 경우 오늘 날짜를 반환합니다.
*
* @param date 날짜를 나타내는 문자열
* @return 변환된 캘린더 객체
*/
public static Calendar CalendarFromStringSimple(String date)
{
Calendar cal = Calendar.getInstance();
try
{
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
cal.setTime(formatter.parse(date));
}
catch(ParseException e)
{
e.printStackTrace();
}
return cal;
}
}
\ No newline at end of file
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import scala.Serializable;
import scala.Tuple2;
import java.util.*;
//ip,app,device,os,channel,click_time,attributed_time,is_attributed
//87540,12,1,13,497,2017-11-07 09:30:38,,0
class Record implements Serializable {
Integer ip;
Integer app;
Integer device;
Integer os;
Integer channel;
Calendar clickTime;
Calendar attributedTime;
Boolean isAttributed;
Integer clickInTenMins;
// constructor , getters and setters
public Record(int pIp, int pApp, int pDevice, int pOs, int pChannel, Calendar pClickTime, Calendar pAttributedTime, boolean pIsAttributed) {
ip = new Integer(pIp);
app = new Integer(pApp);
device = new Integer(pDevice);
os = new Integer(pOs);
channel = new Integer(pChannel);
clickTime = pClickTime;
attributedTime = pAttributedTime;
isAttributed = new Boolean(pIsAttributed);
clickInTenMins = new Integer(0);
}
public Record(int pIp, int pApp, int pDevice, int pOs, int pChannel, Calendar pClickTime, Calendar pAttributedTime, boolean pIsAttributed, int pClickInTenMins) {
ip = new Integer(pIp);
app = new Integer(pApp);
device = new Integer(pDevice);
os = new Integer(pOs);
channel = new Integer(pChannel);
clickTime = pClickTime;
attributedTime = pAttributedTime;
isAttributed = new Boolean(pIsAttributed);
clickInTenMins = new Integer(pClickInTenMins);
}
}
class RecordComparator implements Comparator<Record> {
@Override
public int compare(Record v1 , Record v2) {
// if(a.ano < b.ano) return -1;
// else if(a.ano == b.ano) return 0;
// else return 1;
if (v1.ip.compareTo(v2.ip) == 0) {
return v1.clickTime.compareTo(v2.clickTime);
}
return v1.ip.compareTo(v2.ip);
}
}
public class MapExample {
static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco");
static JavaSparkContext sc = new JavaSparkContext(conf);
static SQLContext sqlContext = new SQLContext(sc);
public static void main(String[] args) throws Exception {
JavaRDD<String> file = sc.textFile("/Users/hyeongyunmun/Dropbox/DetectFraudClick/data/train.csv", 1);
final String header = file.first();
JavaRDD<String> data = file.filter(line -> !line.equalsIgnoreCase(header));
JavaRDD<Record> records = data.map(line -> {
String[] fields = line.split(",");
Record sd = new Record(Integer.parseInt(fields[0]), Integer.parseInt(fields[1]), Integer.parseInt(fields[2]), Integer.parseInt(fields[3]), Integer.parseInt(fields[4]), DateUtil.CalendarFromString(fields[5]), DateUtil.CalendarFromString(fields[6]), "1".equalsIgnoreCase(fields[7].trim()));
return sd;
});
// JavaRDD<Tuple4<Integer,Double,Long,Integer>> secondSortRDD = firstSortRDD.keyBy(new Function<Tuple4<Integer, Double, Long, Integer>, Tuple2<Double, Long>>(){
// @Override
// public Tuple2<Double, Long> call(Tuple4<Integer, Double, Long, Integer> value) throws Exception {
// return new Tuple2(value._2(),value._3());
// }}).sortByKey(new TupleComparator()).values();
JavaRDD<Record> firstSorted = records.sortBy(new Function<Record, Calendar>() {
@Override
public Calendar call(Record record) throws Exception {
return record.clickTime;
}
}, true, 1);
JavaRDD<Record> sortedRecords = firstSorted.sortBy(new Function<Record, Integer>() {
@Override
public Integer call(Record record) throws Exception {
return record.ip.intValue();
}
}, true, 1);
/*
//두개를 한번에 정렬해보려 했지만 실패
JavaRDD<Record> sortedRecords = records.keyBy(new Function<Record, Record>(){
@Override
public Record call(Record record) throws Exception {
return new Record(record.ip, record.app, record.device, record.os, record.channel, record.clickTime, record.attributedTime, record.isAttributed);
}}).sortByKey(new RecordComparator()).values();
*/
// System.out.println("sortedRecords");
// sortedRecords.foreach(record -> {System.out.println(record.ip + " " + record.clickTime.getTime());});
// System.out.println("make result");
/*
//map의 다음것을 가져오려했지만 실패
JavaRDD<Record> result = sortedRecords.map(record -> {
System.out.println("make addTen");
Calendar addTen = Calendar.getInstance();
addTen.setTime(record.clickTime.getTime());
addTen.add(Calendar.MINUTE, 10);
System.out.println("make count");
int count = 0;
for (Record temp: sortedRecords.collect()) {
if (temp.ip.compareTo(record.ip) == 0 && temp.clickTime.compareTo(record.clickTime) > 0 && temp.clickTime.compareTo(addTen)< 0)
count++;
}
return new Record(record.ip, record.app, record.device, record.os, record.channel, record.clickTime, record.attributedTime, record.isAttributed, count);
});
*/
// System.out.println("result");
// result.foreach(record -> {System.out.println(record.ip + " " + record.clickTime.getTime());});
/*
for (final ListIterator<String> it = list.listIterator(); it.hasNext();) {
final String s = it.next();
System.out.println(it.previousIndex() + ": " + s);
}
for (ListIterator<Record> it = sortedRecords.collect().listIterator(); it.hasNext(); it = it.nextIndex()) {
it.
if (temp.ip.compareTo(record.ip) == 0 && temp.clickTime.compareTo(record.clickTime) > 0 && temp.clickTime.compareTo(addTen)< 0)
count++;
}
*/
List<Record> list = sortedRecords.collect();
List<Record> resultList = new ArrayList<Record>();
for (int i = 0; i < list.size(); i++) {
//System.out.println(list.get(i).ip);
Record record = list.get(i);
Calendar addTen = Calendar.getInstance();
addTen.setTime(record.clickTime.getTime());
addTen.add(Calendar.MINUTE, 10);
int count = 0;
for (int j = i+1; j < list.size() && list.get(j).ip.compareTo(record.ip) == 0
&& list.get(j).clickTime.compareTo(record.clickTime) > 0 &&list.get(j).clickTime.compareTo(addTen) < 0; j++)
count++;
resultList.add(new Record(record.ip, record.app, record.device, record.os, record.channel, record.clickTime, record.attributedTime, record.isAttributed, count));
}
JavaRDD<Record> result = sc.parallelize(resultList);
result.foreach(record -> {System.out.println(record.ip + " " + record.clickTime.getTime() + " " + record.clickInTenMins);});
}
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import javax.xml.crypto.Data;
import static org.apache.spark.sql.functions.*;
public class calForwardTimeDelta {
static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco");
static JavaSparkContext sc = new JavaSparkContext(conf);
public static void main(String[] args) throws Exception{
//Create Session
SparkSession spark = SparkSession
.builder()
.appName("Detecting Fraud Clicks")
.getOrCreate();
//run methods hereu
calcDelta(spark);
}
private static void calcDelta(SparkSession spark){
// put the path the file you gonna deal with being placed
String filepath = "train_sample.csv";
// create Dataset from files
Dataset<Row> logDF = spark.read()
.format("csv")
.option("inferSchema", "true")
.option("header","true")
.load(filepath);
// cast timestamp(click_time, attributed_time) type to long type
//add column for long(click_time)
Dataset<Row> newDF = logDF.withColumn("utc_click_time", logDF.col("click_time").cast("long"));
//add column for long(attributed_time)
newDF = newDF.withColumn("utc_attributed_time", logDF.col("attributed_time").cast("long"));
//drop timestamp type columns
newDF = newDF.drop("click_time").drop("attributed_time");
newDF.createOrReplaceTempView("logs");
WindowSpec w = Window.partitionBy ("ip")
.orderBy("utc_click_time");
newDF = newDF.withColumn("lag(utc_click_time)", lag("utc_click_time",1).over(w));
newDF.where("ip=10").show();
newDF = newDF.withColumn("delta", when(col("lag(utc_click_time)").isNull(),lit(0)).otherwise(col("utc_click_time")).minus(when(col("lag(utc_click_time)").isNull(),lit(0)).otherwise(col("lag(utc_click_time)"))));
//newDF = newDF.withColumn("delta", datediff());
newDF = newDF.drop("lag(utc_click_time)");
newDF = newDF.orderBy("ip");
newDF.show();
}
}
public class valid {
private int x;
valid() {
x = 0;
}
void printX(){
System.out.println(x);
}
public static void main(String[] args){
valid v = new valid();
v.printX();
}
}
File mode changed
This diff could not be displayed because it is too large.