신은섭(Shin Eun Seop)

add utill class to load and save csv

...@@ -22,7 +22,7 @@ public class Aggregation { ...@@ -22,7 +22,7 @@ public class Aggregation {
22 // Aggregation 22 // Aggregation
23 Aggregation agg = new Aggregation(); 23 Aggregation agg = new Aggregation();
24 24
25 - Dataset<Row> dataset = agg.loadCSVDataSet("./train_sample.csv", spark); 25 + Dataset<Row> dataset = Utill.loadCSVDataSet("./train_sample.csv", spark);
26 dataset = agg.changeTimestempToLong(dataset); 26 dataset = agg.changeTimestempToLong(dataset);
27 dataset = agg.averageValidClickCount(dataset); 27 dataset = agg.averageValidClickCount(dataset);
28 dataset = agg.clickTimeDelta(dataset); 28 dataset = agg.clickTimeDelta(dataset);
...@@ -31,16 +31,7 @@ public class Aggregation { ...@@ -31,16 +31,7 @@ public class Aggregation {
31 //test 31 //test
32 dataset.where("ip == '5348' and app == '19'").show(10); 32 dataset.where("ip == '5348' and app == '19'").show(10);
33 33
34 - agg.saveCSVDataSet(dataset, "./agg_data"); 34 + Utill.saveCSVDataSet(dataset, "./agg_data");
35 - }
36 -
37 -
38 - private Dataset<Row> loadCSVDataSet(String path, SparkSession spark){
39 - // Read SCV to DataSet
40 - return spark.read().format("csv")
41 - .option("inferSchema", "true")
42 - .option("header", "true")
43 - .load(path);
44 } 35 }
45 36
46 private Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){ 37 private Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){
...@@ -87,12 +78,4 @@ public class Aggregation { ...@@ -87,12 +78,4 @@ public class Aggregation {
87 return newDF; 78 return newDF;
88 } 79 }
89 80
90 - private void saveCSVDataSet(Dataset<Row> dataset, String path){
91 - // Read SCV to DataSet
92 - dataset.repartition(1)
93 - .write().format("csv")
94 - .option("inferSchema", "true")
95 - .option("header", "true")
96 - .save(path);
97 - }
98 } 81 }
......
...@@ -24,14 +24,15 @@ import java.util.*; ...@@ -24,14 +24,15 @@ import java.util.*;
24 24
25 public class MapExample { 25 public class MapExample {
26 26
27 - static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco");
28 - static JavaSparkContext sc = new JavaSparkContext(conf);
29 - static SQLContext sqlContext = new SQLContext(sc);
30 -
31 public static void main(String[] args) throws Exception { 27 public static void main(String[] args) throws Exception {
32 28
33 // Automatically identify categorical features, and index them. 29 // Automatically identify categorical features, and index them.
34 // Set maxCategories so features with > 4 distinct values are treated as continuous. 30 // Set maxCategories so features with > 4 distinct values are treated as continuous.
31 +
32 + Aggregation agg = new Aggregation();
33 +
34 + agg.
35 +
35 Dataset<Row> resultds = sqlContext.createDataFrame(result); 36 Dataset<Row> resultds = sqlContext.createDataFrame(result);
36 37
37 System.out.println("schema start"); 38 System.out.println("schema start");
......
1 +import org.apache.spark.sql.Dataset;
2 +import org.apache.spark.sql.Row;
3 +import org.apache.spark.sql.SparkSession;
4 +
5 +public class Utill {
6 +
7 + public static Dataset<Row> loadCSVDataSet(String path, SparkSession spark){
8 + // Read SCV to DataSet
9 + return spark.read().format("csv")
10 + .option("inferSchema", "true")
11 + .option("header", "true")
12 + .load(path);
13 + }
14 +
15 + public static void saveCSVDataSet(Dataset<Row> dataset, String path){
16 + // Read SCV to DataSet
17 + dataset.repartition(1)
18 + .write().format("csv")
19 + .option("inferSchema", "true")
20 + .option("header", "true")
21 + .save(path);
22 + }
23 +}