# Phi_K spark tutorial

This notebook shows you how to obtain the Phi_K correlation matrix for a spark dataframe.
Calculating the Phi_K matrix consists of two steps:

- Obtain the 2d contingency tables for all variable pairs. To make these we use the [`histogrammar` package](https://github.com/histogrammar/histogrammar-python).
- Calculate the Phi_K value for each variable pair from its contingency table.

Make sure you install the histogrammar package to make the 2d histograms, that are then used to calculate phik.

In [None]:
%%capture
# install histogrammar (if not installed yet)
import sys

!"{sys.executable}" -m pip install histogrammar

In [None]:
import itertools

import pandas as pd
import histogrammar as hg
from histogrammar.plot.hist_numpy import get_2dgrid

import phik
from phik import resources
from phik.phik import spark_phik_matrix_from_hist2d_dict

# histogramming is done using the histogrammar library

In [None]:
from pyspark.sql import SparkSession
from pyspark import __version__ as pyspark_version

scala = '2.12' if int(pyspark_version[0]) >= 3 else '2.11'
hist_jar = f'io.github.histogrammar:histogrammar_{scala}:1.0.20'
hist_spark_jar = f'io.github.histogrammar:histogrammar-sparksql_{scala}:1.0.20'

spark = SparkSession.builder.config(
    "spark.jars.packages", f'{hist_spark_jar},{hist_jar}'
).getOrCreate()

spark = SparkSession.builder.config(
    "spark.jars.packages", f'{hist_spark_jar},{hist_jar}'
).getOrCreate()

sc = spark.sparkContext

# Load data

A simulated dataset is part of the phik-package. The dataset concerns fake car insurance data. Load the dataset here:

In [None]:
data = pd.read_csv( resources.fixture('fake_insurance_data.csv.gz') )
sdf = spark.createDataFrame(data)
sdf.show()

In [None]:
combis = itertools.combinations_with_replacement(sdf.columns, 2)
combis = [list(c) for c in combis]

In [None]:
print(combis)

# step 1: create histograms (this runs spark histogrammar in the background)


In [None]:
# see the doc-string of hg_make_histograms() for binning options.
hists = sdf.hg_make_histograms(combis)

In [None]:
# collect the numpy contingency tables into a dict
grids = {k:(get_2dgrid(h)[2]) for k,h in hists.items()}
print(grids)

In [None]:
# we can store the histograms if we want to
if False:
    import pickle

    with open('grids.pkl', 'wb') as outfile:
        pickle.dump(grids, outfile)

    with open('grids.pkl', 'rb') as handle:
        grids = pickle.load(handle)

# step 2: calculate phik matrix (runs rdd parallellization over all 2d histograms)

In [None]:
phik_matrix = spark_phik_matrix_from_hist2d_dict(sc, grids)

In [None]:
phik_matrix