How to perform a Switch statement with Apache Spark Dataframes (Python) -


i'm trying perform operation on data value mapped list of pre-determined values if matches 1 of criteria, or fall-through value otherwise.

this equivalent sql:

case             when user_agent \'%canvasapi%\' \'api\'             when user_agent \'%candroid%\' \'mobile_app_android\'             when user_agent \'%icanvas%\' \'mobile_app_ios\'             when user_agent \'%canvaskit%\' \'mobile_app_ios\'             when user_agent \'%windows nt%\' \'desktop\'             when user_agent \'%macbook%\' \'desktop\'             when user_agent \'%iphone%\' \'mobile\'             when user_agent \'%ipod touch%\' \'mobile\'             when user_agent \'%ipad%\' \'mobile\'             when user_agent \'%ios%\' \'mobile\'             when user_agent \'%cros%\' \'desktop\'             when user_agent \'%android%\' \'mobile\'             when user_agent \'%linux%\' \'desktop\'             when user_agent \'%mac os%\' \'desktop\'             when user_agent \'%macintosh%\' \'desktop\'             else \'other_unknown\'             end user_agent_type 

i new spark, , first attempt @ program uses lookup dictionary , adjusts values line line in rdd so:

user_agent_vals = {     'canvasapi': 'api',     'candroid': 'mobile_app_android',     'icanvas': 'mobile_app_ios',     'canvaskit': 'mobile_app_ios',     'windows nt': 'desktop',     'macbook': 'desktop',     'iphone': 'mobile',     'ipod touch': 'mobile',     'ipad': 'mobile',     'ios': 'mobile',     'cros': 'desktop',     'android': 'mobile',     'linux': 'desktop',     'mac os': 'desktop',     'macintosh': 'desktop' }  def parse_requests(line: list,                    id_data: dict,                    user_vals: dict = user_agent_vals):     """     expects input list maps following indexes:         0: user_id         1: context(course)_id         2: request_month         3: user_agent_type      :param line: list of values.     :return: list     """     found = false     key, value in user_vals.items():         if key in line[3]:             found = true             line[3] = value     if not found:         line[3] = 'other_unknown'     # retrieves session_id count id_data dictionary using     # user_id key.     session_count = id_data[str(line[0])]     line.append(session_count)     line.extend(config3.etl_list)     return [str(item) item in line] 

my current code has data in dataframe, , i'm not sure how perform above operation efficiently. know immutable needs returned new dataframe, question how best this. here code:

from boto3 import client import psycopg2 ppg2 pyspark import sparkconf, sparkcontext pyspark.sql import sqlcontext pyspark.sql.functions import current_date, date_format, lit, stringtype  emr_client = client('emr') conf = sparkconf().setappname('canvas requests logs') sc = sparkcontext(conf=conf) sql_context = sqlcontext(sc) # dependencies # sc.addpyfile()  user_agent_vals = {     'canvasapi': 'api',     'candroid': 'mobile_app_android',     'icanvas': 'mobile_app_ios',     'canvaskit': 'mobile_app_ios',     'windows nt': 'desktop',     'macbook': 'desktop',     'iphone': 'mobile',     'ipod touch': 'mobile',     'ipad': 'mobile',     'ios': 'mobile',     'cros': 'desktop',     'android': 'mobile',     'linux': 'desktop',     'mac os': 'desktop',     'macintosh': 'desktop' }  if __name__ == '__main__':     df = sql_context.read.parquet(         r'/users/mharris/pycharmprojects/etl3/pyspark/datasets/'         r'usage_data.gz.parquet')      course_data = df.filter(df['context_type'] == 'course')     request_data = df.select(         df['user_id'],         df['context_id'].alias('course_id'),         date_format(df['request_timestamp'], 'mm').alias('request_month'),         df['user_agent']     )      sesh_id_data = df.groupby('user_id').count()      joined_data = request_data.join(         sesh_id_data,         on=request_data['user_id'] == sesh_id_data['user_id']     ).drop(sesh_id_data['user_id'])      all_fields = joined_data.withcolumn(         'etl_requests_usage', lit('dev')     ).withcolumn(         'etl_datetime_local', current_date()     ).withcolumn(         'etl_transformation_name', lit('agg_canvas_logs_user_agent_types')     ).withcolumn(         'etl_pdi_version', lit(r'apache spark')     ).withcolumn(         'etl_pdi_build_version', lit(r'1.6.1')     ).withcolumn(         'etl_pdi_hostname', lit(r'n/a')     ).withcolumn(         'etl_pdi_ipaddress', lit(r'n/a')     ).withcolumn(         'etl_checksum_md5', lit(r'n/a')     ) 

as ps, there better way add columns way i've done it?

if want can use sql expression directly:

expr = """     case         when user_agent \'%android%\' \'mobile\'         when user_agent \'%linux%\' \'desktop\'         else \'other_unknown\'     end user_agent_type"""  df = sc.parallelize([     (1, "android"), (2, "linux"), (3, "foo") ]).todf(["id", "user_agent"])  df.selectexpr("*", expr).show() ## +---+----------+---------------+ ## | id|user_agent|user_agent_type| ## +---+----------+---------------+ ## |  1|   android|         mobile| ## |  2|     linux|        desktop| ## |  3|       foo|  other_unknown| ## +---+----------+---------------+ 

otherwise can replace combination of when , like , otherwise:

from pyspark.sql.functions import col, when functools import reduce  c = col("user_agent") vs = [("android", "mobile"), ("linux", "desktop")] expr = reduce(     lambda acc, kv: when(c.like(kv[0]), kv[1]).otherwise(acc),      vs,      "other_unknown" ).alias("user_agent_type")  df.select("*", expr).show()  ## +---+----------+---------------+ ## | id|user_agent|user_agent_type| ## +---+----------+---------------+ ## |  1|   android|         mobile| ## |  2|     linux|        desktop| ## |  3|       foo|  other_unknown| ## +---+----------+---------------+ 

you can add multiple columns in single select:

exprs = [c.alias(a) (a, c) in [   ('etl_requests_usage', lit('dev')),    ('etl_datetime_local', current_date())]]  df.select("*", *exprs) 

Comments

Popular posts from this blog

java - nested exception is org.hibernate.exception.SQLGrammarException: could not extract ResultSet Hibernate+SpringMVC -

sql - Postgresql tables exists, but getting "relation does not exist" when querying -

asp.net mvc - breakpoint on javascript in CSHTML? -