How can I Classify the values of the Clients table with the values of the rows of the Combinations table?
I decide to create a combinations table to develop all combinations from main row (Clients Table).
I am planning to check that the row of the customers coincides with a row of the combinations table to classify it as sector B (Combinations Table).
I have this flow but Dtabricks returns error:
for i,j in select_df.iterrows():
for u,v in dfCombinacionesDias.iterrows():
if (
(select_df["MONDAY"][i] == registro["LUNES"][u])
and (select_df["TUESDAY"][i] == registro["MARTES"][u])
and (select_df["WEDNESDAY"][i] == registro["MIERCOLES"][u])
and (select_df["THURSDAY"][i] == registro["JUEVES"][u])
and (select_df["FRIDAY"][i] == registro["VIERNES"][u])
and (select_df["SATURDAY"][i] == registro["SABADO"][u])
and (select_df["SUNDAY"][i] == registro["DOMINGO"][u])
):
Sector = "B"
else:
Sector = "A"
vSubSeq = "('{}','{}')".format(select_df["IDClient"][i],Sector)
sqlInsertSequence = "Insert into {0}.{1} values {2}".format(dSCHEMA, Table, vSubSeq,vdataDeltaPath)
print(sqlInsertSequence)
dfTables = spark.sql(sqlInsertSequence)
I add the image with the different tables (Clients, Combinations and Sector)
I think that I need a for to loop a table row by row (Combinations table) to compare with a row in clients table if there is a match I save this value in a new table (sector table) and obviously will exist other for to loop the clients table. But I would like to know an algorithm that helps look tables to compare?
I assume that "x"
in the posted data example works like a boolean trigger. So why not to replace it with True
and empty space with False
? After that, we can apply logical operators directly to data. For example, what does it mean that the client's days do not fit in the "Sector B"
pattern? Schematically it means any(client_days and not sector_b) is True
, as in the following model:
import pandas as pd
week_days = 'mon tue wed thu fri sat sun'.split()
client_days = pd.Series([0,1,0,0,1,0,0], index=week_days)
sector_b = pd.Series([1,0,1,0,1,0,0], index=week_days)
assert any(client_days & ~sector_b)
pandas 1.5.1
Let's model this idea in Pandas, as if we could apply toPandas
to the original data:
import pandas as pd
week_days = 'mon tue wed thu fri sat sun'.split()
data = [
[0,1,0,0,1,0,0],
[1,0,1,0,1,0,0],
[1,0,1,0,0,0,0],
[1,0,0,0,0,0,0],
[1,0,0,0,1,0,0],
[0,0,1,0,1,0,0],
[0,0,0,0,1,0,0],
[0,0,1,0,0,0,0],
[1,1,1,1,1,1,1],
[1,0,1,0,0,0,0],
]
clients = pd.DataFrame(
data,
index=1 + pd.Index(range(len(data)), name='Client'),
columns=week_days,
dtype=bool
)
sectors = pd.DataFrame(
data=[[1,0,1,0,1,0,0]],
index=pd.Index(['Sector B'], name='sector'),
columns=week_days,
dtype=bool,
)
In this case we could use dot
operator, i.e. scalar product, keeping in mind that addition and multiplication correspond to the or/and operations in the case of boolean data:
answer = (clients @ ~sectors.loc['Sector B']).map({True: 'A', False: 'B'})
pyspark 3.4.1
Suppose that for some reason we can't use toPandas
. Let's reorganize data as if they are PySpark DataFrame:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
clients_sdf = spark.createDataFrame(clients.reset_index())
sectors_sdf = spark.createDataFrame(sectors.reset_index())
How would we implement the idea being restricted to this data type?
First, sector's data are small and we can extract them in some sequence (e.g. list). Next, we can apply map
for logical AND, then reduce
for logical OR, which gives us True
for "Sector A"
cases and "False"
otherwise. After that we apply when
from pyspark.sql.functions
to map values:
from pyspark.sql.functions import lit, when
from functools import reduce
client_data = clients_sdf[week_days]
sector_b = [*sectors_sdf.where('sector == "Sector B"')[week_days].first()]
not_in_B = map(lambda x, y: x & lit(not y), client_data, sector_b)
is_in_sector_A = reduce(lambda x, y: x | y, not_in_B)
client_sector = when(is_in_sector_A, 'A').otherwise('B')
answer = clients_sdf.withColumn('Sector', client_sector).select('Client', 'Sector')
Output:
>>> answer.show()
+------+------+
|Client|Sector|
+------+------+
| 1| A|
| 2| B|
| 3| B|
| 4| B|
| 5| B|
| 6| B|
| 7| B|
| 8| B|
| 9| A|
| 10| B|
+------+------+
This is just a fantasy on what it might look like in the general case. Suppose we have these data:
import pandas as pd
week_days = 'mon tue wed thu fri sat sun'.split()
data = [
[0,1,0,1,0,0,0], # changed to fit a new Sector A
[1,0,1,0,1,0,0],
[1,0,1,0,0,0,0],
[1,0,0,0,0,0,0],
[1,0,0,0,1,0,0],
[0,0,1,0,1,0,0],
[0,0,0,0,1,0,0],
[0,0,1,0,0,0,0],
[1,1,1,1,1,1,1], # fit Sector C
[1,0,1,0,0,0,0],
]
clients = pd.DataFrame(
data,
index=1 + pd.Index(range(len(data)), name='Client'),
columns=week_days,
dtype=bool
)
sectors = pd.DataFrame( # add Sector A, Sector C
data=[[0,1,0,1,0,1,0], [1,0,1,0,1,0,0], [1,1,1,1,1,1,1]],
index=pd.Index(['Sector A', 'Sector B', 'Sector C'], name='sector'),
columns=week_days,
dtype=bool,
)
We can see here 3 sectors presumably arranged in descending order of their priority, which we might want to represent in the final frame by their last letter.
Let's do it in Pandas:
isin_sector = ~(clients @ ~sectors.T)
answer = (
isin_sector
.apply(lambda column: column.map({True: column.name[-1]}))
.agg(lambda row: row.dropna()[0], axis='columns')
)
display(answer)
Now in PySpark, trying to avoid Pandas API. Here, when applying coalesce
, I rely on the fact that dictionaries in Python preserve the order in which items are added:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, coalesce
from functools import reduce
spark = SparkSession.builder.getOrCreate()
clients_sdf = spark.createDataFrame(clients.reset_index())
sectors_sdf = spark.createDataFrame(sectors.reset_index())
client_data = clients_sdf[week_days]
def is_in_sector(sector):
'''sector : a boolean sequence'''
return ~reduce(lambda x, y: x | y,
map(lambda x, y: x & lit(not y),
client_data, sector))
sectors = {
(rest:=rec.asDict()).pop('sector')[-1]: is_in_sector(rest.values())
for rec in sectors_sdf.collect()
}
client_sector = coalesce(
*(when(is_in_sec, sec_name) for sec_name, is_in_sec in sectors.items())
)
answer = clients_sdf.withColumn('Sector', client_sector).select('Client', 'Sector')
answer.show()