process.py
2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import sys
import pymysql
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
f = open('config.json')
config = json.load(f)
f.close()
s = config['database']['host'].split(':')
host = s[0]
port = 3306
if len(s) == 2:
port = int(s[1])
db = pymysql.connect(
user=config['database']['user'],
passwd=config['database']['password'],
host=host,
port=port,
db=config['database']['name'],
charset='utf8'
)
cursor = db.cursor()
ext_no = int(sys.argv[1])
cursor.execute("SELECT `type`, `number`, `duration`, `date` FROM calls WHERE `extraction_no`=%s", (ext_no))
calls = cursor.fetchall()
cursor.execute("SELECT `type`, `address`, `date` FROM messages WHERE `extraction_no`=%s", (ext_no))
messages = cursor.fetchall()
regions = {
'+822': 'Seoul',
'+8231': 'Gyeonggi',
'+8232': 'Incheon',
'+8233': 'Gangwon',
'+8241': 'Chungnam',
'+8242': 'Daejeon',
'+8243': 'Chungbuk',
'+8244': 'Sejong',
'+8251': 'Busan',
'+8252': 'Ulsan',
'+8253': 'Daegu',
'+8254': 'Gyeongbuk',
'+8255': 'Gyeongnam',
'+8261': 'Jeonnam',
'+8262': 'Gwangju',
'+8263': 'Jeonbuk',
'+8264': 'Jeju'
}
spark = SparkSession.builder.getOrCreate()
cdf = spark.createDataFrame(list(calls), schema=['type', 'number', 'duration', 'date'])
mdf = spark.createDataFrame(list(messages), schema=['type', 'address', 'date'])
dfdc = cdf.select(F.to_date(F.col('date')).alias('date')).groupBy('date').agg(F.count('*').alias('calls'))
dfdm = mdf.select(F.to_date(F.col('date')).alias('date')).groupBy('date').agg(F.count('*').alias('messages'))
dfd = dfdc.join(dfdm, ['date'], 'fullouter').select(
'date',
F.coalesce('calls', F.lit(0)).alias('calls'),
F.coalesce('messages', F.lit(0)).alias('messages')
)
for r in dfd.collect():
sql = "INSERT INTO daily_contacts VALUES (%s, %s, %s, %s)"
cursor.execute(sql, (ext_no, r[0], r[1], r[2]))
result = None
for key, val in regions.items():
crdf = cdf[cdf['number'][0:len(key)] == key]
mrdf = mdf[mdf['address'][0:len(key)] == key]
duration = crdf.select(F.sum('duration')).collect()[0][0]
if duration == None:
duration = 0
rdf = spark.createDataFrame(
[(
val,
crdf[crdf['type'] == 1].count(),
crdf[crdf['type'] == 2].count(),
duration,
mrdf[mrdf['type'] == 1].count(),
mrdf[mrdf['type'] == 2].count()
)],
schema=['region', 'incoming', 'outgoing', 'duration', 'receive', 'send'])
if result == None:
result = rdf
else:
result = result.union(rdf)
rows = result.collect()
for r in rows:
sql = "INSERT INTO region_stats VALUES (%s, %s, %s, %s, %s, %s, %s)"
cursor.execute(sql, (ext_no, r[0], r[1], r[2], r[3], r[4], r[5]))
db.commit()
db.close()
spark.stop()