Ma Suhyeon

Implement region stats

...@@ -14,6 +14,7 @@ type Config struct { ...@@ -14,6 +14,7 @@ type Config struct {
14 Password string `json:"password"` 14 Password string `json:"password"`
15 } `json:"database"` 15 } `json:"database"`
16 TokenSecret string `json:"token_secret"` 16 TokenSecret string `json:"token_secret"`
17 + PythonBin string `json:"python_bin"`
17 } 18 }
18 19
19 func LoadConfig(path string) (Config, error) { 20 func LoadConfig(path string) (Config, error) {
...@@ -24,5 +25,9 @@ func LoadConfig(path string) (Config, error) { ...@@ -24,5 +25,9 @@ func LoadConfig(path string) (Config, error) {
24 err = json.Unmarshal(data, &config) 25 err = json.Unmarshal(data, &config)
25 } 26 }
26 27
28 + if config.PythonBin == "" {
29 + config.PythonBin = "python"
30 + }
31 +
27 return config, err 32 return config, err
28 } 33 }
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
7 "io/ioutil" 7 "io/ioutil"
8 "net/http" 8 "net/http"
9 "os" 9 "os"
10 + "os/exec"
10 11
11 "github.com/dgrijalva/jwt-go" 12 "github.com/dgrijalva/jwt-go"
12 "github.com/jmoiron/sqlx" 13 "github.com/jmoiron/sqlx"
...@@ -165,5 +166,7 @@ func (app *App) PostExtractions(c echo.Context) error { ...@@ -165,5 +166,7 @@ func (app *App) PostExtractions(c echo.Context) error {
165 166
166 tx.Commit() 167 tx.Commit()
167 168
169 + exec.Command(app.Config.PythonBin, "process.py", fmt.Sprint(extNo)).Run()
170 +
168 return c.NoContent(http.StatusNoContent) 171 return c.NoContent(http.StatusNoContent)
169 } 172 }
......
1 +import sys
2 +import pymysql
3 +import json
4 +from pyspark.sql import SparkSession
5 +from pyspark.sql import functions as F
6 +
7 +f = open('config.json')
8 +config = json.load(f)
9 +f.close()
10 +
11 +s = config['database']['host'].split(':')
12 +host = s[0]
13 +port = 3306
14 +if len(s) == 2:
15 + port = int(s[1])
16 +
17 +db = pymysql.connect(
18 + user=config['database']['user'],
19 + passwd=config['database']['password'],
20 + host=host,
21 + port=port,
22 + db=config['database']['name'],
23 + charset='utf8'
24 +)
25 +
26 +cursor = db.cursor()
27 +
28 +ext_no = int(sys.argv[1])
29 +
30 +cursor.execute("SELECT `type`, `number`, `duration` FROM calls WHERE `extraction_no`=%s", (ext_no))
31 +calls = cursor.fetchall()
32 +
33 +cursor.execute("SELECT `type`, `address` FROM messages WHERE `extraction_no`=%s", (ext_no))
34 +messages = cursor.fetchall()
35 +
36 +regions = {
37 + '02': 'Seoul',
38 + '031': 'Gyeonggi',
39 + '032': 'Incheon',
40 + '033': 'Gangwon',
41 + '041': 'Chungnam',
42 + '042': 'Daejeon',
43 + '043': 'Chungbuk',
44 + '044': 'Sejong',
45 + '051': 'Busan',
46 + '052': 'Ulsan',
47 + '053': 'Daegu',
48 + '054': 'Gyeongbuk',
49 + '055': 'Gyeongnam',
50 + '061': 'Jeonnam',
51 + '062': 'Gwangju',
52 + '063': 'Jeonbuk',
53 + '064': 'Jeju'
54 +}
55 +
56 +spark = SparkSession.builder.getOrCreate()
57 +
58 +cdf = spark.createDataFrame(list(calls), schema=['type', 'number', 'duration'])
59 +mdf = spark.createDataFrame(list(messages), schema=['type', 'address'])
60 +
61 +result = None
62 +for key, val in regions.items():
63 + crdf = cdf[cdf['number'][0:len(key)] == key]
64 + mrdf = mdf[mdf['address'][0:len(key)] == key]
65 +
66 + duration = crdf.select(F.sum('duration')).collect()[0][0]
67 + if duration == None:
68 + duration = 0
69 +
70 + rdf = spark.createDataFrame(
71 + [(
72 + val,
73 + crdf[crdf['type'] == 1].count(),
74 + crdf[crdf['type'] == 2].count(),
75 + duration,
76 + mrdf[mrdf['type'] == 1].count(),
77 + mrdf[mrdf['type'] == 2].count()
78 + )],
79 + schema=['region', 'incoming', 'outgoing', 'duration', 'receive', 'send'])
80 + if result == None:
81 + result = rdf
82 + else:
83 + result = result.union(rdf)
84 +
85 +rows = result.collect()
86 +for r in rows:
87 + sql = "INSERT INTO region_stats VALUES (%s, %s, %s, %s, %s, %s, %s)"
88 + cursor.execute(sql, (ext_no, r[0], r[1], r[2], r[3], r[4], r[5]))
89 +
90 +db.commit()
91 +db.close()
92 +
93 +spark.stop()
...\ No newline at end of file ...\ No newline at end of file