How to perform a Switch statement with Apache Spark Dataframes (Python) -
i'm trying perform operation on data value mapped list of pre-determined values if matches 1 of criteria, or fall-through value otherwise.
this equivalent sql:
case when user_agent \'%canvasapi%\' \'api\' when user_agent \'%candroid%\' \'mobile_app_android\' when user_agent \'%icanvas%\' \'mobile_app_ios\' when user_agent \'%canvaskit%\' \'mobile_app_ios\' when user_agent \'%windows nt%\' \'desktop\' when user_agent \'%macbook%\' \'desktop\' when user_agent \'%iphone%\' \'mobile\' when user_agent \'%ipod touch%\' \'mobile\' when user_agent \'%ipad%\' \'mobile\' when user_agent \'%ios%\' \'mobile\' when user_agent \'%cros%\' \'desktop\' when user_agent \'%android%\' \'mobile\' when user_agent \'%linux%\' \'desktop\' when user_agent \'%mac os%\' \'desktop\' when user_agent \'%macintosh%\' \'desktop\' else \'other_unknown\' end user_agent_type
i new spark, , first attempt @ program uses lookup dictionary , adjusts values line line in rdd
so:
user_agent_vals = { 'canvasapi': 'api', 'candroid': 'mobile_app_android', 'icanvas': 'mobile_app_ios', 'canvaskit': 'mobile_app_ios', 'windows nt': 'desktop', 'macbook': 'desktop', 'iphone': 'mobile', 'ipod touch': 'mobile', 'ipad': 'mobile', 'ios': 'mobile', 'cros': 'desktop', 'android': 'mobile', 'linux': 'desktop', 'mac os': 'desktop', 'macintosh': 'desktop' } def parse_requests(line: list, id_data: dict, user_vals: dict = user_agent_vals): """ expects input list maps following indexes: 0: user_id 1: context(course)_id 2: request_month 3: user_agent_type :param line: list of values. :return: list """ found = false key, value in user_vals.items(): if key in line[3]: found = true line[3] = value if not found: line[3] = 'other_unknown' # retrieves session_id count id_data dictionary using # user_id key. session_count = id_data[str(line[0])] line.append(session_count) line.extend(config3.etl_list) return [str(item) item in line]
my current code has data in dataframe
, , i'm not sure how perform above operation efficiently. know immutable needs returned new dataframe, question how best this. here code:
from boto3 import client import psycopg2 ppg2 pyspark import sparkconf, sparkcontext pyspark.sql import sqlcontext pyspark.sql.functions import current_date, date_format, lit, stringtype emr_client = client('emr') conf = sparkconf().setappname('canvas requests logs') sc = sparkcontext(conf=conf) sql_context = sqlcontext(sc) # dependencies # sc.addpyfile() user_agent_vals = { 'canvasapi': 'api', 'candroid': 'mobile_app_android', 'icanvas': 'mobile_app_ios', 'canvaskit': 'mobile_app_ios', 'windows nt': 'desktop', 'macbook': 'desktop', 'iphone': 'mobile', 'ipod touch': 'mobile', 'ipad': 'mobile', 'ios': 'mobile', 'cros': 'desktop', 'android': 'mobile', 'linux': 'desktop', 'mac os': 'desktop', 'macintosh': 'desktop' } if __name__ == '__main__': df = sql_context.read.parquet( r'/users/mharris/pycharmprojects/etl3/pyspark/datasets/' r'usage_data.gz.parquet') course_data = df.filter(df['context_type'] == 'course') request_data = df.select( df['user_id'], df['context_id'].alias('course_id'), date_format(df['request_timestamp'], 'mm').alias('request_month'), df['user_agent'] ) sesh_id_data = df.groupby('user_id').count() joined_data = request_data.join( sesh_id_data, on=request_data['user_id'] == sesh_id_data['user_id'] ).drop(sesh_id_data['user_id']) all_fields = joined_data.withcolumn( 'etl_requests_usage', lit('dev') ).withcolumn( 'etl_datetime_local', current_date() ).withcolumn( 'etl_transformation_name', lit('agg_canvas_logs_user_agent_types') ).withcolumn( 'etl_pdi_version', lit(r'apache spark') ).withcolumn( 'etl_pdi_build_version', lit(r'1.6.1') ).withcolumn( 'etl_pdi_hostname', lit(r'n/a') ).withcolumn( 'etl_pdi_ipaddress', lit(r'n/a') ).withcolumn( 'etl_checksum_md5', lit(r'n/a') )
as ps, there better way add columns way i've done it?
if want can use sql
expression directly:
expr = """ case when user_agent \'%android%\' \'mobile\' when user_agent \'%linux%\' \'desktop\' else \'other_unknown\' end user_agent_type""" df = sc.parallelize([ (1, "android"), (2, "linux"), (3, "foo") ]).todf(["id", "user_agent"]) df.selectexpr("*", expr).show() ## +---+----------+---------------+ ## | id|user_agent|user_agent_type| ## +---+----------+---------------+ ## | 1| android| mobile| ## | 2| linux| desktop| ## | 3| foo| other_unknown| ## +---+----------+---------------+
otherwise can replace combination of when
, like
, otherwise
:
from pyspark.sql.functions import col, when functools import reduce c = col("user_agent") vs = [("android", "mobile"), ("linux", "desktop")] expr = reduce( lambda acc, kv: when(c.like(kv[0]), kv[1]).otherwise(acc), vs, "other_unknown" ).alias("user_agent_type") df.select("*", expr).show() ## +---+----------+---------------+ ## | id|user_agent|user_agent_type| ## +---+----------+---------------+ ## | 1| android| mobile| ## | 2| linux| desktop| ## | 3| foo| other_unknown| ## +---+----------+---------------+
you can add multiple columns in single select
:
exprs = [c.alias(a) (a, c) in [ ('etl_requests_usage', lit('dev')), ('etl_datetime_local', current_date())]] df.select("*", *exprs)
Comments
Post a Comment