process.py 2.79 KB
import sys
import pymysql
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

f = open('config.json')
config = json.load(f)
f.close()

s = config['database']['host'].split(':')
host = s[0]
port = 3306
if len(s) == 2:
    port = int(s[1])

db = pymysql.connect(
    user=config['database']['user'],
    passwd=config['database']['password'],
    host=host,
    port=port,
    db=config['database']['name'],
    charset='utf8'
)

cursor = db.cursor()

ext_no = int(sys.argv[1])

cursor.execute("SELECT `type`, `number`, `duration`, `date` FROM calls WHERE `extraction_no`=%s", (ext_no))
calls = cursor.fetchall()

cursor.execute("SELECT `type`, `address`, `date` FROM messages WHERE `extraction_no`=%s", (ext_no))
messages = cursor.fetchall()

regions = {
    '+822': 'Seoul',
    '+8231': 'Gyeonggi',
    '+8232': 'Incheon',
    '+8233': 'Gangwon',
    '+8241': 'Chungnam',
    '+8242': 'Daejeon',
    '+8243': 'Chungbuk',
    '+8244': 'Sejong',
    '+8251': 'Busan',
    '+8252': 'Ulsan',
    '+8253': 'Daegu',
    '+8254': 'Gyeongbuk',
    '+8255': 'Gyeongnam',
    '+8261': 'Jeonnam',
    '+8262': 'Gwangju',
    '+8263': 'Jeonbuk',
    '+8264': 'Jeju'
}

spark = SparkSession.builder.getOrCreate()

cdf = spark.createDataFrame(list(calls), schema=['type', 'number', 'duration', 'date'])
mdf = spark.createDataFrame(list(messages), schema=['type', 'address', 'date'])

dfdc = cdf.select(F.to_date(F.col('date')).alias('date')).groupBy('date').agg(F.count('*').alias('calls'))
dfdm = mdf.select(F.to_date(F.col('date')).alias('date')).groupBy('date').agg(F.count('*').alias('messages'))

dfd = dfdc.join(dfdm, ['date'], 'fullouter').select(
    'date',
    F.coalesce('calls', F.lit(0)).alias('calls'),
    F.coalesce('messages', F.lit(0)).alias('messages')
)

for r in dfd.collect():
    sql = "INSERT INTO daily_contacts VALUES (%s, %s, %s, %s)"
    cursor.execute(sql, (ext_no, r[0], r[1], r[2]))

result = None
for key, val in regions.items():
    crdf = cdf[cdf['number'][0:len(key)] == key]
    mrdf = mdf[mdf['address'][0:len(key)] == key]

    duration = crdf.select(F.sum('duration')).collect()[0][0]
    if duration == None:
        duration = 0

    rdf = spark.createDataFrame(
        [(
            val,
            crdf[crdf['type'] == 1].count(),
            crdf[crdf['type'] == 2].count(),
            duration,
            mrdf[mrdf['type'] == 1].count(),
            mrdf[mrdf['type'] == 2].count()
        )],
        schema=['region', 'incoming', 'outgoing', 'duration', 'receive', 'send'])
    if result == None:
        result = rdf
    else:
        result = result.union(rdf)

rows = result.collect()
for r in rows:
    sql = "INSERT INTO region_stats VALUES (%s, %s, %s, %s, %s, %s, %s)"
    cursor.execute(sql, (ext_no, r[0], r[1], r[2], r[3], r[4], r[5]))

db.commit()
db.close()

spark.stop()