Simulating Real-Estate Price Dynamics with an Heterogeneous Agent Model

Eric Östring
7 min readMay 3, 2021

This medium post investigates real-estate price dynamics by constructing a heterogeneous agent-based model of boundedly rational agents. The model accounts for pricing strategies based on past market fundamentals and past price development with agents altering behaviour according to a social dynamic.

png

The presented model builds upon a neoclassical stock-flow model which is extended to incorporate boundedly rational agents and a social dynamic of agent interaction. Heterogeneous agents consist of two types of real-estate market investors; fundamentalists who base their real-estate price valuation on market fundamentals and momentum traders who price real-estate assets depending on past price development. A recruitment model is also implemented, in which the probability of an agent converting to a specific strategy is increasing in the relative proportion of agents holding the respective strategy. This approach is augmented by allowing parameters of the recruitment model to be endogenously determined by relative past performance of an investment strategy against the alternative strategy, which is determined by a fitness model.

For more detail and mathematical definition of the simulation environment, see the corresponding dissertation.

import math
import numpy as np
import matplotlib.pyplot as plt
import quantecon as qe
import statsmodels.api as sm
import pandas as pd
class RealEstate:
"""
Heterogeneous Agent Based model.

Parameters:
------------
T: scalar
The current discrete time period
α_1: scalar
The Demand constant for property
E : np.array
The exogenous demand instrument
ε: scalar
Elasticity of demand for housing
δ : scalar
Depreciation rate
n : scalar
Lag period for construction
α_2: scalar
Supply of housing constant
η: scalar
Elasticity of supply of housing
S : np.array
Contains housing stock levels at each T
R : np.array
Contains rent levels at each T
P : np.array
Contains market forecasted asset price levels at each T
P_f : np.array
Contains fundamental trader's valuation
P_m : np.array
Contains chartists trader's valuation
m : scalar
Moving average memory parameter. Periods used to calculate moving average to determine valuation for P_m at a given year
C : np.array
Contains construction levels at each T
T : scalar
Discrete time unit, starts at 0
v : scalar
Share of Mometum Traders. (1-v) is share of Fundamentalists
q : scalar
Fitness model memory parameter. Memory parameter used to evaluate past performance.
y : scalar
Intensity of choice parameter. Measures how sensitive agents are in selecting optimal strategy
numAgents : scalar
The number of agents which have heterogeneous investment strategies
a : scalar
Sensitivity parameter that determines the influence that Fitness Model has on Recruitment Modeli
"""


def __init__(self, totalIterations=900, E=np.full(999, 10.0), ε=np.full(999, 0.4), δ=0.05, n=5,
η=np.full(999, 2.0), r=np.full(999, 0.05), T=0, S=np.array([2500.0]), R=np.array([20.0]), P=np.full(999, 400.0),
P_f=np.full(999, 400.0), P_m=np.full(999, 400.0), m=5, C=np.array([]), q=5, initShareFundamental=0.5,
w=np.array([0.5]), y=0.1, numAgents=100, a=0.1):

# Initializing variables
self.totalIterations, self.E, self.ε, self.δ, self.n, self.η, self.r, self.T, self.S, self.R, self.P, self.C, self.P_f, self.P_m, self.m, self.q, self.initShareFundamental, self.w, self.y, self.numAgents, self.a = totalIterations, E, ε, δ, n, η, r, T, S, R, P, C, P_f, P_m, m, q, initShareFundamental, w, y, numAgents, a


# Creating the stabele construction level
new_C = self.δ*self.S[0]
self.C = np.append(self.C, new_C)

# Solve for steady state α_2 and α_1
self.α_1 = (self.S[0]*(self.R[0]**self.ε[0]))/(self.E[0])
self.α_2 = (self.C[0]/self.S[0])*(1/(self.P[0]**self.η[0]))


# Assuming the past construction levels initiated from T = -n to T = -1 where equivalent to T=0,
# we account for the flow of this, hence S[0] to S[n-1] is already determined. Here we determine up to
# S[n-2] and S[n-1] will be determined in first call of the "iteration" method
for i in range(self.n-2):
new_S = (1-self.δ)*self.S[i] + self.C[0]
new_R = (new_S/(self.α_1*self.E[1+i]))**(1/-self.ε[1+i])

self.S = np.append(self.S, new_S)
self.R = np.append(self.R, new_R)


# Setting initial P_m to P_f
self.P_m[0] = self.P_f[0]

# Create initial population of investors of different investment strategy with even distribution
self.initAgent = qe.random.draw(np.cumsum((self.initShareFundamental,1-self.initShareFundamental)), self.numAgents)
self.agentBeliefs = self.initAgent
self.numMomentum = np.array([sum(self.agentBeliefs)])
self.v = np.array([self.numMomentum/self.numAgents])

self.T +=1

for i in range(self.totalIterations):
self.iteration()


def iteration(self):

self.update_S()
self.update_R()
self.update_Social_Dynamic()
self.update_P()
self.update_C()

self.T +=1


def update_S(self):
"""
Updates the np.array containing the Stock levels for each period
"""
# At time T the previously calculated C[T-1] and S[T+n-3] will determine S[T+n-2]
new_S = (1-self.δ)*self.S[self.T+self.n-3] + self.C[self.T-1]
self.S = np.append(self.S, new_S)

def update_R(self):
"""
Updates the np.array containing the Rent levels for each period
"""
# At time T the previously calculated S[T+n-2] and the exogenous E[T+n-2] determined R[T+n-2]
new_R = (self.S[self.T+self.n-2]/(self.α_1*self.E[self.T+self.n-2]))**(1/-self.ε[self.T+self.n-2])
self.R = np.append(self.R, new_R)

def update_Social_Dynamic(self):

# Social Dynamics start only after there have been more time periods than the memory parameter (q)
self.update_Fitness_Model()

# Before Fitness model memory parameter lag has lapsed, investors stick to their initial strategies
if self.T < self.q:
self.agentBeliefs = np.vstack((self.agentBeliefs, self.initAgent))
self.numMomentum = np.append(self.numMomentum,sum(self.agentBeliefs[-1,:]))
self.v = np.append(self.v, self.numMomentum[-1]/self.numAgents) # Update the array containing share of Momentum Traders

# Start introducing social dynamic only after q periods (memory parameter) has lapsed
else:
self.agentBeliefs = np.vstack((self.agentBeliefs, self.update_Recruitment_Model(self.agentBeliefs)))
self.numMomentum = np.append(self.numMomentum, sum(self.agentBeliefs[-1,:]))
self.v = np.append(self.v, self.numMomentum[-1]/self.numAgents)

def update_Fitness_Model(self):
"""
Updates the relative performance of momentum strategy relative fundamentalist strategy (w)
"""
u_f = 0 # Fitness of the Fundamentalist strategy
u_m = 0 # Fitness of the Mometum Trader strategy

if self.T >= self.q: # If Fitness model memory parameter lag has lapsed, check performance during the past q periods
for i in range(self.q):
u_f += (self.P[self.T-i-1] - self.P[self.T-i-2])*np.sign(self.P_f[self.T-i-1] - self.P_f[self.T-i-2])
u_m += (self.P[self.T-i-1] - self.P[self.T-i-2])*np.sign(self.P_m[self.T-i-1] - self.P_m[self.T-i-2])
u_f = u_f/self.q
u_m = u_m/self.q

new_w = (math.e**(self.y*u_m))/(math.e**(self.y*u_m)+math.e**(self.y*u_f))
self.w = np.append(self.w, new_w)

def update_Recruitment_Model(self, X, o=0.05):
"""
Takes in the an np.array where columns represent agents and rows represent agent's investing strategy at
a specific discrete time
"""
# "0" represents Fundamentalist and "1" represents Momentum Trader
# o is probability that agent independantly converts
# p_m is strength of convincing Fundamentalist (0) to Momentum Trader (1)
# p_f is strength of convincing Momentum Trader (1) to Fundamentalist (0)
p_m = self.a*self.w[self.T]
p_f = self.a-p_m
X = X[-1,:]

N = len(X) # Total number of agents
M = sum(X) # Total number of Momentum Traders
F = N - M # Total number of Fundamentalists

# First element relevant for Fundamentalists (0), Second relevant for Momentum Trader (1)
# "A" represents a process in which the probabilities between state transitions evolve according
# to the Fitness Model
A = np.array([[1 - o - p_m*(M/(N-1)), o + p_m*(M/(N-1))],
[o + p_f*(F/(N-1)), 1 - o - p_f*(F/(N-1))]])

# This is a vector that acts as a place holder for the latest investment strategies of the agents
X_new = np.empty(len(X), dtype=int)

# Converts each row in A into a cumulative distribution. Store this in temporary array A_dist
A_dist = [np.cumsum(A[i,:]) for i in range(len(A))]

# Create the new investment strategies
for i in range(len(X)):
# A_dist[X[i]] takes "previous" strategy of an agent and returns the relevant distributions
# which includes probabilities from which next strategy is chosen from
X_new[i] = qe.random.draw(A_dist[X[i]])

return X_new


def update_P(self):
"""
Updates the np.array containing the Price levels for each period
"""
self.update_P_f()
self.update_P_m()

# Re-assigns the whole vector at a time, P vector is constantly changing element values and size
self.P = self.v[self.T]*self.P_m + (1.0-self.v[self.T])*self.P_f[:len(self.P_f)]


def update_P_f(self):
for i in range(self.n):
# Project from P_f[T] to P_m[T+n-1] using current rents R[T]
# In next iteration, P_m[T] wont change but all projections beyond that will
self.P_f[self.T+i] = self.R[self.T]/self.r[self.T]

def update_P_m(self):
# We use P_m as an array that gets modified throughout with the most current price forecasts
if self.T < self.m:
self.P_m[self.T] = self.P_f[self.T] # if Moving average memory parameter has not lapsed, copy P_f
elif self.T >= self.m:
# Find price at T given past m period's MA growth rate and project to T+n-1 starting from T
growth = 0.0
for i in range(self.m-1):
# Determine the growth rate based on past m periods MA
growth += (self.P[self.T-1-i]-self.P[self.T-2-i])/self.P[self.T-2-i]


growth = growth/(self.m-1) # Divide by (m-1) as we are looking at past m period's growth rate per year averaged out. Not CAGR

for i in range(self.n):
# Project from P_m[T] to P_m[T+n-1] using the growth rate
# In next iteration, P_m[T] wont change but all projections beyond that will
self.P_m[self.T+i] = ((1+growth)**(1+i))*self.P[self.T-1]

def update_C(self):
# At every T, we determine only C[T]
new_C = self.α_2*(self.P[self.T+self.n-1]**self.η[self.T])*self.S[self.T+self.n-2]
self.C = np.append(self.C, new_C)
def plot():

names = np.array(["Price", "Relative Performance of Mometum Trader", "Share of Mometum Traders"])

values = np.array([re.P,re.w,re.v])

fig, axes = plt.subplots(len(names), 1, figsize=(10, 10))
fig.tight_layout(pad=4.0)

for i in range(len(values)):
axes[i].plot(values[i][0:300])
axes[i].set_title(names[i])
if i==0:
axes[i].set_ylim([0, 700])
axes[i].axvline(x=99, color="red", alpha=0.7, linestyle='dashed')
if i==1:
axes[i].set_ylim([0, 1])
axes[i].axvline(x=99, color="red", alpha=0.7, linestyle='dashed')
if i==2:
axes[i].set_ylim([0, 1])
axes[i].axvline(x=99, color="red", alpha=0.7, linestyle='dashed')
def plot2():

names = np.array(["Price", "Construction", "Stock", "Rent"])

values = np.array([re.P,re.C,re.S,re.R])

fig, axes = plt.subplots(len(names), 1, figsize=(10, 12))
fig.tight_layout(pad=4.0)

for i in range(len(values)):
axes[i].plot(values[i][0:re.totalIterations])
axes[i].set_title(names[i])
axes[i].set_xlim([0, 100])
axes[i].axvline(x=10, color="red", alpha=0.7, linestyle='dashed')

EXPERIMENTS

Test Full Model with Exogeneous -10% Demand Shock

exogenous = np.append(np.full(100, 10.0), np.full(899,9.0))

re = RealEstate(totalIterations=300, E=exogenous, m=5)

plot()

#plt.savefig('/Users/ericostring/Desktop/Edinburgh_Academics/Year_4/Dissertation/Figures_v3/ExperimentDemandIncrease.png')
png
# FIND THE VARIANCE IN PRICE AFTER NEGATIVE EXOGENOUS SHOCK IN MODEL WITHOUT POLICY
timeOfShock = 11
end_T = 100
num_simulations = 1000
recorded_variance_no_policy = np.array([])

for a in range(num_simulations):
exogenous = np.append(np.full(timeOfShock, 10.0), np.full(899,9.0))
re = RealEstate(totalIterations=end_T, E=exogenous)
recorded_variance_no_policy = np.append(recorded_variance_no_policy, np.var(re.P[timeOfShock:end_T]))
# FIND THE NORMALIZED STD IN PRICE AFTER NEGATIVE EXOGENOUS SHOCK IN MODEL WITHOUT POLICY
timeOfShock = 11
end_T = 100
num_simulations = 1000
recorded_normalized_std_no_policy = np.array([])

for a in range(num_simulations):
exogenous = np.append(np.full(timeOfShock, 10.0), np.full(899,9.0))
re = RealEstate(totalIterations=end_T, E=exogenous)
normalized = re.P[timeOfShock:end_T]/np.mean(re.P[timeOfShock:end_T])
recorded_normalized_std_no_policy = np.append(recorded_normalized_std_no_policy, np.std(normalized))
# Cut the window with 2 rows and 2 columns:
plt.subplot(221)
plt.scatter(re.R[0:re.totalIterations],re.S[0:re.totalIterations], marker='o', alpha=0.4)
plt.xlabel("Rent")
plt.ylabel("Stock")

plt.subplot(222)
plt.scatter(re.P[0:re.totalIterations], re.R[0:re.totalIterations], marker='o', color="orange", alpha=0.3)
plt.xlabel("Price")
plt.ylabel("Rent")


plt.subplot(223)
plt.scatter(re.C[0:re.totalIterations], re.P[0:re.totalIterations], marker='D', color="green", alpha=0.3)
plt.xlabel("Construction")
plt.ylabel("Price")

plt.subplot(224)
plt.scatter(re.S[0:re.totalIterations], re.C[0:re.totalIterations], marker='o', color="grey", alpha=0.3)
plt.xlabel("Stock")
plt.ylabel("Construction")

plt.subplots_adjust(top=1.5, right=1.5)
png

--

--