Using PySpark to Scale Markov Decision Problems for Policy Exploration

Finding policies that lead to optimal outcomes for an organization are some of the most difficult challenges facing decision makers within an organization. The reason for it is the fact that policies are not made in a world with perfect information and markets in equilibrium. These are complex systems where the behavior of entities within the system are dynamic and generally uncertain. Reinforcement Learning (RL) has gained popularity for modeling complex behavior to identify optimal strategy. RL maps states or situations to actions in order to maximize some result or reward. The Markov Decision Process (MDP) is a core component of the RL methodology. The Markov chain is a probabilistic model that uses the current state to predict the next state.

This presentation discusses using PySpark to scale an MDP example problem. When simulating complex systems, it can be very challenging to scale to large numbers of agents, due to the amount of processing that needs to be performed in memory as each agent goes through a permutation. PySpark allows us to leverage Spark for the distributed data processing and Python to define the states and actions of the agents.


1.Using PySpark to scale Markov Decision Problems for Policy Exploration Justin Brandenburg Machine Learning Architect @ Databricks #UnifiedDataAnalytics #SparkAISummit

2.About Me • Member of the Professional Services team at Databricks • Background in economics, cyber analytics and IT • Based in Washington, DC USA • Education: – Bachelors in Economics – Virginia Tech – Masters in Applied Economics – Johns Hopkins University – Masters in Computational Social Science – George Mason University • Previously worked as: – Senior Data Scientist for big data platform vendor – Lead Data Scientist for consulting company #UnifiedDataAnalytics #SparkAISummit 2

3.Agenda • Systems, Policies, Complexity and Modeling • Markov Decision Processes for Policy evaluation • Using PySpark for MDP modeling • Example • Demo • Summary #UnifiedDataAnalytics #SparkAISummit 3

4.Systems & Policies • A system is a group or combination of interrelated, interdependent, or interacting elements – Systems have purposes or goals – Policies are created to achieve desired outcomes • A policy is a combination of principles that are created to guide decisions and achieve rational outcomes • Policies that lead to ideal outcomes for a system are some of the most difficult challenges facing decision makers within an organization. #UnifiedDataAnalytics #SparkAISummit 4

5.Uncertainty Impacting Policy • A complex system is where each of the entities may be perfectly understood, but the behavior of the system as a whole cannot necessarily be predicted • Complex systems do not provide perfect information and never achieve equilibrium • Uncertainty and non-rational logic can lead to emergent behavior that policies can’t always account for #UnifiedDataAnalytics #SparkAISummit 5

6.Complex System Modeling • Agent-Based Modeling – Schelling’s Segregation Model – Sugarscape • Game Theory – Prisoners Dilemma – Texas Hold’em Poker • Discrete Event Simulation – A clinical diagnosis – Traffic accidents • Markov Decision Process – Traveling Salesman #UnifiedDataAnalytics #SparkAISummit 6

7.Markov Decision Process #UnifiedDataAnalytics #SparkAISummit 7

8.Evaluate all Strategies and Outcomes Source Screenrant #UnifiedDataAnalytics #SparkAISummit 8

9.Markov Decision Process • Framework for modeling decisions • A Markov Process describes the state of a system • When there is a possibility of making a decision (action) from a list of possible decisions it becomes a Markov Decision Process • Often applied in: – Energy Grid Optimization – Economic planning – Logistics – Risk Management – Robotics #UnifiedDataAnalytics #SparkAISummit 9

10.Why PySpark for MDP? “The power of intelligence stems from the our vast diversity, not from any single, perfect principle.” - Marvin Minsky, 1986. The Society of Mind. • Efforts to accurately represent real world problems has highlighted the inability for a single all encompassing model (one state-action space for one objective) to scale • Spark provides a distributed computing engine for scaling data analysis • MDPs are simulations, they create a large amount of data that is used to identify optimal processes #UnifiedDataAnalytics #SparkAISummit 10

11.Performing MDP in PySpark • MDPs are run using the Spark resilient distributed dataset (RDD) • Allowing for the ability to map functions to specific environments through key-value attributes • Each row in the RDD is an independent entity that does not interact with other entities, only with the policy and states 11

12.Agents • Agents are the entities interacting with the environment by executing certain actions, taking observations, and receiving eventual rewards • Goal is to identify optimal behavior based on policy parameters • Behavior is often a transition done in a sequential manner: 1. Decision is made 2. Action is performed 3. Outcome is evaluated 4. New decision is made #UnifiedDataAnalytics #SparkAISummit 12

13.Generating Agents from Existing Data class Projects_Agent: If you had an existing dataset of def __init__(self, line): self.project_id = line[0] projects and you were going to run self.start_date = line[1] What-If analysis on what would be self.cur_date = line[2] self.labor = line[3] the optimal schedule based on things = line[4] self.week_prod = line[5] like equipment availability, cost or self.ex_prod = line[6] external market factors. self.num_weeks = line[7] self.weekly_labor_costs = line[8] self.weekly_equip_costs = line[9] = True In this case, each line[x] would be mapped to to a column in our data def initialize_project(line): frame that is converted into an RDD. proj = Projects_Agent (line) return proj #UnifiedDataAnalytics #SparkAISummit 13

14.Generating Agents using Parameters Agents can be generated using data class Agent: as attributed parameters. This def __init__(self, row): allows for standing up boundaries of = row[0] behavior that the agents can def create_agents(row): agent = Agent(row) transition through based on policy agent.car_type_index = random.uniform(0,1) decisions. agent.car_type = 'gas' agent.car_loan = random.randint(0,30000) agent.avg_car_payment = loan_payment agent.annual_depreciation = 0.10 agent.number_of_payments = 0 agent.personal_property_tax = .04 #UnifiedDataAnalytics #SparkAISummit 14

15. Actions and State Transitions def policy_per_agent(row): Specify actions and transitions with agent = row credit = 2000 RDD transformation functions. if agent.car_type == 'gas' and agent.avg_car_payment >= 0: if agent.transportation_costs == agent.transportation_savings: switched_to_ev_vehicle(agent, credit) else: def switched_to_ev_vehicle(row, credit): pass agent = row return agent agent.car_type = 'ev' agent.car_loan = 40000 - credit agent.avg_car_payment = 500 agent.car_value = 35000 agent.gas_price = 0.00 agent.gallons = 0 agent.monthly_refuels = 0.0 agent.percentage_time_express_lanes = 1.00 agent.tolls_paid = 0 agent.commute_time = 30 agent.commuting_costs = 150.00 return agent #UnifiedDataAnalytics #SparkAISummit 15

16. Executing the MDP Create an MDP Function def run_mdp(row, time, policy): mdp_data = [] that executes the actions agent = create_agents(row) and transitions initialize_agent_attributes(agent) apply_mdp_using_policy(agent, time, mdp_data, policy) return mdp_data Instantiate the number of agents needed and convert to RDD. Apply function via flatMap() car_agents = 50000 agentRDD = spark.createDataFrame(zip(range(1, car_agents + 1)), ["driver_id"]).rdd t = 36 policy = 1 mdp_results = agentRDD.flatMap(lambda x:run_mdp(x,t,policy)).toDF() #UnifiedDataAnalytics #SparkAISummit 16

17.Example #UnifiedDataAnalytics #SparkAISummit 17

18.Electronic Vehicles and Toll Lanes • A local government enacted policy to reduce vehicle congestion during periods of the day when commuters are on their way to and from work • To reduce congestion along key routes toll lanes where put place to alleviate congestion and speed up commutes • The toll lanes are free for electronic vehicle commuters • Commuters who drive gas powered vehicles can use the tolls but the tolls increase the more cars that merge onto the toll lanes 18

19.Use Case • As more commuters switch to electronic vehicles, the toll lanes are increasingly becoming more congested leading to longer commute times • Could the incentives put in place by the policy makers have led to changes in commuter behavior at a faster pace than what was originally planned? 19

20.Agents • The agents in this example are commuters – Approximately 10% drive electronic vehicles – Among the commuters that drive gas vehicles • 50% have paid off their vehicles • 50% have an more payments to make 20

21.State • Each month the commuter evaluates the current state of transportation costs vs transportation savings • Commuters in gas vehicles show preference for short term rewards associated with: – Lower car loan payments or no payments – Lower property taxes • Commuters in EVs show preference for long term rewards associated with: – Increased savings due to no tolls or gas 21

22.Actions • If the commuter uses an electronic vehicle: – Has ability to switch to an EV if the costs associated with transportation meet a threshold where the short term benefits of low or zero monthly payments no longer outweigh the savings associated with purchasing an EV 22

23.Policies • Policy makers are evaluating updates to their commuter policy. • The policies under consideration are: A. Remove the price credit awarded to new EV owner thereby increasing cost of ownership B. Remove the price credit awarded to new EV owner and toll EV commutes, but at a lower rate than gas vehicle commuters C. Toll EV commuters at lower rate but provide the price credit for new purchases 23

24.Optimization Algos for MDPs • Value Iteration Method – Discrete time method – Start from some state, S, and respond to transitions according to stated policy for a horizon of N time periods, update an estimate of the optimal value repeatedly • Policy Iteration – 2 Steps: 1. Value Determination - arbitrarily selecting an initial policy P and then calculate marginal utility 2. Policy Improvement - a better policy is selected and the value determination step is repeated • Linear Programming – Identify the minimum and maximum value of a function subject to a set of constraints 24

25.Optimization for this Example • This example will use the Policy Iteration – Set of states is defined and static – There are simultaneous calculations for actions – Infinite horizon • Evaluate results for optimal result 25

26.Experiment Walkthrough 26

27.Additional Considerations • Discounting was included but was static • Transition probabilities may not stay the same over time • Did the policies choose the right agent attributes to subject to actions and transitions? • Adding random percentage of commuters who switch to EV from gas vehicles regardless of financial impact 27

28.Future Project Goals • Leverage Deep Learning frameworks for additional optimization for each agent • Considering each agent is looking to achieve best results, are those results the best for the group? • How can we share information between epochs to distribute information – In a distributed environment this is very challenging – Possibly just by agents in each partition -> local information sharing 28

29.Thank You Code for the simulations can be found on github #UnifiedDataAnalytics #SparkAISummit