############################################################################################################# ### metapopulation.py - A program for simluating metapopulation dynamics. ### ### ### ### David Fallaize (MRes student), CoMPLEX, UCL ### ### December 2006 ### ### ### ############################################################################################################# import sys import math, random from Tkinter import * # Tkinter is the handy python module for producing a GUI import tkMessageBox ############################################################################################################# ### Some constants. ### ############################################################################################################# CANVAS_SIZE = 500 # 500 x 500 pixels - initial canvas size CANVAS_SCALE_FACTOR = 25 # 25 pixels represents 1 km. CONVERGENCE_CRITERION = 1e-6 # a low number to check for convergence in successive iterations MAXIMUM_ITERATIONS = 10000 # maximum iterations to bother with (avoids infinite loops) NUMBER_OF_TIMESTEPS = 100 # number of time steps to carry out in the time stepping routine ############################################################################################################# ### Class to represent a population patch ### ############################################################################################################# class Patch: # This is the major class which represents a single population patch. # However the class itself also keeps track of all the patches via the # Patch.all_patches syntax - i.e. all_patches is a list of all the instances of # Patch objects, and is available to all Patches, so it is simple for a patch # to query all the other patches. # a list of all Patches in the model resides in the Class Patch (class variable) all_patches = [] # the canvas we use to draw the patches - also needs to be accessible by all instances # of Patch objects, so is a class variable. window = None canvas = None ############################################################################################################# ### Constructor and destructor. ### ############################################################################################################# def __init__(self, x, y, area, occ): # x and y are co-ordinates in km, area is square kilometres, occ is boolean (occupied or not) # add this patch to our global (class) list of patches Patch.all_patches.append(self) # Var holders for the Tk Entry widgets we will create self.xVar = DoubleVar() # holds x centre co-ord self.xVar.set(x) self.yVar = DoubleVar() # holds y centre co-ord self.yVar.set(y) self.areaVar = DoubleVar() # holds area self.areaVar.set(area) self.logAreaVar = DoubleVar() # holds log of area self.logAreaVar.set(math.log(area)) self.occVar = BooleanVar() # holds whether occupied or not self.occVar.set(occ) # Tk widget handles self.handle = None # handle for the circle representing this patch self.text_handle = None # handle for the text label numbering this patch # the model variables for this population self.occupation_probability = 1.0 self.connectivity = 0.0 self.colonization_probability = 0.0 self.draw() def __del__(self): # as part of the destructor we have to undraw ourselves from the canvas, and # removed ourselves from the Class level list of all Patch instances self.undraw() Patch.all_patches.remove(self) ############################################################################################################# ### Patch properties - allow simpler manipulation of the underlying variables. ### ############################################################################################################# # area property definition def getArea(self): return self.areaVar.get() def setArea(self, new_area): self.areaVar.set(new_area) area = property(getArea, setArea, doc='area of the patch (square kilometres)') # x property definition - is x co-ord of patch in canvas co-ords # property allows underlying variable to be in kilometres (or whatever) and for # all canvas scaling to be carried out transparently def getX(self): return math.floor(self.xVar.get() * CANVAS_SCALE_FACTOR) def setX(self, newx): self.xVar.set(newx / CANVAS_SCALE_FACTOR) x = property(getX, setX, doc='set x co-ordinate of centre point (canvas co-ords)') # y property definition - as x above def getY(self): return math.floor(self.yVar.get() * CANVAS_SCALE_FACTOR) def setY(self, newy): self.yVar.set(newy / CANVAS_SCALE_FACTOR) y = property(getY, setY, doc='set y co-ordinate of centre point (canvas co-ords)') # radius property definition - radius in canvas pixels of a patch # (Again, scaling carried out transparently) def getRadius(self): return math.floor(math.sqrt(self.area / math.pi) * CANVAS_SCALE_FACTOR) radius = property(getRadius, doc='radius of the patch (canvas co-ords)') # bbox property def getBBox(self): # calculate the top left and bottom right co-ordinates of the bounding box for the circle # this is used for drawing the circle using create_oval function in Tk r = self.radius return (self.x - r, self.y - r, self.x + r, self.y + r) bbox = property(getBBox, doc='bounding box of the patch for drawing purposes (canvas pixel co-ords)') # occupancy property def getOccupancy(self): return self.occVar.get() def setOccupancy(self, newOccupancy): self.occVar.set(newOccupancy) occupancy = property(getOccupancy, setOccupancy, doc='set the occupancy of the patch (True or False)') # colour property def getColour(self): if self.occupancy: return 'red' else: return 'black' colour = property(getColour, doc='get fill colour (depends on occupancy of the patch)') def getId(self): # return the id number of this patch instance return Patch.all_patches.index(self) id = property(getId) ############################################################################################################# ### Helper functions for inter-patch distance. ### ############################################################################################################# # static function to calculate Euclidean distance def euclideanDistance(point_a, point_b): # calculate the euclidean distance between points a and b (specified as (x,y) tuples) return math.sqrt((point_a[0] - point_b[0]) ** 2 + (point_a[1] - point_b[1]) ** 2) euclideanDistance = staticmethod(euclideanDistance) def distance(cls, i ,j): # get distance between two patches - is a class method patchi = cls.all_patches[i] patchj = cls.all_patches[j] return Patch.euclideanDistance((patchi.xVar.get(), patchi.yVar.get()), (patchj.xVar.get(), patchj.yVar.get())) distance = classmethod(distance) ############################################################################################################# ### These are the actual functions to do the Metapopulation maths - i.e. implementation of eqs 1-3 ### ############################################################################################################# def iterateConnectivity(self, alpha, parm_x, parm_y): # calculate Si (connectivity) for this patch, using all the emmigration probabilities and areas of the # other patches in the model (Eqn 1). Alpha is passed in as a parameter, as are parm_x and parm_y. # This patch is the i'th patch i = self.id # stash the current values, so that we can compare the results in a second to see if we've converged old_si, old_pi, old_ci = self.connectivity, self.occupation_probability, self.colonization_probability # reset the connectivity before we carry out the summation in this loop self.connectivity = 0 for patch in Patch.all_patches: j = patch.id # the other patch is the j'th patch in my array of patches if i != j: self.connectivity += math.exp(-alpha * Patch.distance(i,j)) * patch.occupation_probability * patch.area # update pi and Ci using newly calculated Si self.calculateColonizationProbability(parm_y) self.calculateOccupationProbability(parm_x, parm_y) # see if we've converged if (math.fabs(old_si - self.connectivity) + math.fabs(old_pi - self.occupation_probability) + math.fabs(old_ci - self.colonization_probability)) > CONVERGENCE_CRITERION: # not converged return False else: # have converged return True def calculateColonizationProbability(self, parm_y): # calculate Ci for this patch (Eqn 2) # parm_y is the y parameter which must be supplied. SiSquared = self.connectivity ** 2 # Ci self.colonization_probability = SiSquared / ((parm_y ** 2) + SiSquared) def calculateOccupationProbability(self, parm_x, parm_y): # calculate the emmigration probability pi (Eqn 3) # parm_x and parm_y are model parameters which must be passed in SiSquared = self.connectivity ** 2 denom = ((parm_y ** 2) / (self.area ** parm_x)) + SiSquared # pi self.occupation_probability = SiSquared / denom def timeStep(self): # carry out one time step using the probabilities produced by the iterate function # (should have called iterate before calling this function) rand = random.random() # get a random number between 0.0 and 1.0 if self.occupancy == True: # if occ=1 then occ=0 on next time step with probability pi # --> test if the random number is =< than occupation probability if rand <= self.occupation_probability: self.occupancy = False else: # if occ=0 then occ=1 on the next time step with probability Ci if rand <= self.colonization_probability: self.occupancy = True ############################################################################################################# ### End of metapopulation functions ### ############################################################################################################# ############################################################################################################# ### Display functions for drawing and moving patches on the canvas. Pretty boring. ### ############################################################################################################# def toggleOccupancy(self, *args): # toggle occupancy of this patch self.occupancy = not(self.occupancy) self.canvas.itemconfigure(self.handle, fill=self.colour) def movePatch(self, event): # event handler for when the user moves a patch with the mouse r = self.radius oldx = self.x oldy = self.y newx = self.canvas.canvasx(event.x) newy = self.canvas.canvasy(event.y) # constrain the patches to be within the canvas if newx > CANVAS_SIZE: newx = CANVAS_SIZE if newy > CANVAS_SIZE: newy = CANVAS_SIZE if newx < 0: newx = 0 if newy < 0: newy = 0 self.setX(newx) self.setY(newy) self.canvas.coords(self.handle, newx-r, newy-r, newx+r, newy+r) # move the patch by however much the mouse moved self.canvas.coords(self.text_handle, newx, newy) # move the patch label number too! # refresh inter-patch distance matrix DistanceVar.refresh() def bringToTop(self, event): # bring patch to top of the canvas stack Patch.canvas.lift(self.handle) Patch.canvas.lift(self.text_handle) def setFocus(self, event): # event handler for when the user focuses on a patch with the mouse # opens up a Patch Editor window where user can edit area and co-ords PatchEditor.editPatch(self) def draw(self, *args): # draw this patch # canvas housekeeping try: # this will fail if the canvas has been closed / hasn't been created yet... Patch.canvas.config() except: # ... in which case create the canvas! Patch.window = Toplevel(Model.master) Patch.window.title("Patch layout") Patch.canvas = Canvas(Patch.window, height=CANVAS_SIZE, width=CANVAS_SIZE) Patch.canvas.create_rectangle(2,2,CANVAS_SIZE+1,CANVAS_SIZE+1) Patch.canvas.pack(fill=NONE, expand=False) # do this here for when we get a callback from the PatchEditor which might change the x,y co-ords DistanceVar.refresh() # maintain area vars if args is not None: # if this has been called by the PatchEditor then check for discrepency between logArea and # the area we have stored - it may have been edited so need to update. if (math.fabs(math.log(self.area) - self.logAreaVar.get()) > 0.001): self.areaVar.set(math.exp(self.logAreaVar.get())) # undraw us if we are currently drawn self.undraw() # draw the circle, assign event handlers for mouse clicking self.handle = Patch.canvas.create_oval(self.bbox, fill=self.colour) Patch.canvas.tag_bind(self.handle, '', self.bringToTop) Patch.canvas.tag_bind(self.handle, '', self.toggleOccupancy) Patch.canvas.tag_bind(self.handle, '', self.setFocus) Patch.canvas.tag_bind(self.handle, '', self.movePatch) # draw the text label (number of this patch), and assign mouse event handlers self.text_handle = Patch.canvas.create_text(self.x, self.y, text=str(self.id + 1), fill='white') Patch.canvas.tag_bind(self.text_handle, '', self.bringToTop) Patch.canvas.tag_bind(self.text_handle, '', self.toggleOccupancy) Patch.canvas.tag_bind(self.text_handle, '', self.setFocus) Patch.canvas.tag_bind(self.text_handle, '', self.movePatch) def undraw(self): # remove this patch from the canvas try: self.canvas.delete(self.handle) self.canvas.delete(self.text_handle) except: None ############################################################################################################# ### Class to control the Model. ### ############################################################################################################# class Model: # This is the main class for holding the Tk elements which make up the GUI # Model parameters are controlled in here, and this class holds the functions for adding # new patches (or instantiating them anyway...), for controlling the iteration functions, # and for carrying out the time stepping. master = Tk() master.title("Metapopulation dynamics (David Fallaize, CoMPLEX, 2006)") converged_values = None button_frame = None time_step = None add_patch = None iterate = None reciprocalAlphaVar = None alphaVar = StringVar() parm_xVar = None parm_yVar = None states = None current_timestep = 0 ############################################################################################################# ### Controller functions for manipulating all the patches. ### ############################################################################################################# def addPatch(x=None, y=None, area=None, occ=None): # function to add a patch to the model # if we have not been explicitly passed Patch parameters, randomly create some # x and y co-ords (in km) are somewhere within the range allowed by the canvas and scaling # area is derived from a randomly selected radius, plus a minimum of 5 km2 # occ is 50/50 occupied or not if x is None: x = random.random() * CANVAS_SIZE / CANVAS_SCALE_FACTOR if y is None: y = random.random() * CANVAS_SIZE / CANVAS_SCALE_FACTOR if area is None: area = 5 + math.pi * (random.random() * (CANVAS_SIZE / CANVAS_SCALE_FACTOR) / 5) ** 2 if occ is None: occ = random.random() < 0.5 # create the patch p = Patch(x,y,area,occ) # (re)draw the distance matrix DistanceMatrix.draw() # focus the patch editor window on this patch PatchEditor.editPatch(p) # make sure the controller window is still on top Model.master.lift() # this is a static method of the class addPatch = staticmethod(addPatch) def deletePatch(patch): # delete a patch from the model patch.__del__() DistanceMatrix.draw() Model.redraw() deletePatch = staticmethod(deletePatch) def redraw(): # redraw the whole canvas for p in Patch.all_patches: p.draw() redraw = staticmethod(redraw) ############################################################################################################# ### Controller functions to iterate and time step the model. ### ############################################################################################################# def iterateModel(cls): # iterate over the model figuring out the steady state probabilities and connectivity across the model # reset the Patch variables. for patch in Patch.all_patches: patch.occupation_probability = 1.0 patch.colonization_probability = 0.0 patch.connectivity = 0.0 # iterate the model to convergence # give up if takes more than MAXIMUM_ITERATIONS to converge # (but always do at least 10 iterations...) converged = False counter = 0 while not converged: converged = True # avoid infinite loops counter += 1 if counter > MAXIMUM_ITERATIONS: tkMessageBox.showerror(title="Error",message="Could not converge") return # loop over all patches in the model and get them to calculate their P, C, S values # if they don't change much then patch.iterateConnectivity will return True indicating # that patch appears to have converged, otherwise False, indicating the values are still # changing. for patch in Patch.all_patches: this_patch_converged = patch.iterateConnectivity(1 / Model.reciprocalAlphaVar.get(), Model.parm_xVar.get(), Model.parm_yVar.get()) converged = (counter > 10) and converged and this_patch_converged # function call to display a little window showing what all the converged values were cls.displayConvergedValues(counter) # finally enable the Time Step button now that we've calculated the probabilities cls.time_step.config(state='normal') # set this function as a classmethod iterateModel = classmethod(iterateModel) # This function carries out the time stepping 100 times, then stores the results. # The canvas is then commandeered, a 'next' and 'prev' button added, and the patches frozen into # position so the user can step through the resultant time steps. def timeStepAll(): # carry out time-stepping N = len(Patch.all_patches) # number of patches Model.states = [] for s in range(NUMBER_OF_TIMESTEPS+1): Model.states.append({}) for patch in Patch.all_patches: Model.states[s][patch.id] = patch.occupancy # store state of p'th patch at s'th timestep patch.timeStep() # destroy any open patch editor window try: PatchEditor.editor.destroy() except: pass # disable some buttons Model.iterate.config(state='disabled') Model.add_patch.config(state='disabled') Model.time_step.config(state='disabled') Model.parm_xVar.config(state='disabled') Model.parm_yVar.config(state='disabled') Model.reciprocalAlphaVar.config(state='disabled') # commandeer the Patch canvas, remove mouse event bindings Patch.window.title("Occupancy number dynamics - red=occupied, black=unoccupied") for item in Patch.canvas.find_all(): Patch.canvas.tag_unbind(item, '') Patch.canvas.tag_unbind(item, '') Patch.canvas.tag_unbind(item, '') Patch.canvas.tag_unbind(item, '') # add out buttons and label to the canvas for stepping through the states Button(Patch.window, text="prev", command=Model.showPrevState).pack(side=LEFT) Button(Patch.window, text="next", command=Model.showNextState).pack(side=RIGHT) Model.state_label = Label(Patch.window) Model.state_label.pack() Model.showState() timeStepAll = staticmethod(timeStepAll) def showNextState(cls): # show states at t+1 (increment current_timestep) cls.current_timestep += 1 if cls.current_timestep > NUMBER_OF_TIMESTEPS: cls.current_timestep=NUMBER_OF_TIMESTEPS cls.showState() showNextState = classmethod(showNextState) def showPrevState(cls): # show states at t-1 (decrement current_timestep) cls.current_timestep -= 1 if cls.current_timestep < 0: cls.current_timestep = 0 cls.showState() showPrevState = classmethod(showPrevState) def showState(cls): # show state at current_timestep # update the patch states for patch in Patch.all_patches: if patch.occupancy != cls.states[cls.current_timestep][patch.id]: patch.toggleOccupancy() # update the text label Model.state_label.config(text="""Timestep t=%d""" %cls.current_timestep) showState = classmethod(showState) ############################################################################################################# ### End of interesting controller functions ### ############################################################################################################# ############################################################################################################# ### Relatively boring Tk display functions - nothing much interesting here... ### ############################################################################################################# def initButtonLayout(cls): explanatory_text = """Use "add patch" to add patches to the canvas, which may be moved around with the mouse. Double click to change occupancy (red=occupied, black=unoccupied). Press "iterate" to calculate steady state values for Pi, Ci, Si. Then press time step to obtain time stepped diagrams. Model parameters may be varied with the sliders below.""" Label(cls.master, text=explanatory_text, wraplength=400, relief=RIDGE).grid() button_frame = Frame(cls.master) cls.add_patch = Button(button_frame, text="Add Patch", width=10, command=cls.addPatch) cls.add_patch.grid(row=3, column=1) cls.iterate = Button(button_frame, text="Iterate", width=10, command=cls.iterateModel) cls.iterate.grid(row=3, column=2) cls.time_step = Button(button_frame, text="Time Step", width=10, command=cls.timeStepAll, state='disabled') cls.time_step.grid(row=3, column=3) Button(button_frame, text="Exit", width=10, command=sys.exit).grid(row=3, column=4, padx=50, sticky="E") button_frame.grid(row=3, padx=25, pady=25, sticky="E") parm_frame = Frame(cls.master) cls.reciprocalAlphaVar = Scale(parm_frame, label="1/alpha", from_=1, to=50, resolution=1e-3, width=10, showvalue=True, orient=HORIZONTAL, state='normal', command=cls.updateAlpha) cls.reciprocalAlphaVar.grid(row=1, column=1) cls.reciprocalAlphaVar.set(1 / 0.06) alpha_frame = Frame(parm_frame) Label(alpha_frame, text="alpha").grid(sticky="E") Entry(alpha_frame, state='disabled', width=5, textvariable=cls.alphaVar).grid(sticky="W") alpha_frame.grid(row=2, column=1) cls.parm_xVar = Scale(parm_frame, label="x", from_=0.1, to=1.0, resolution=1e-2, width=10, showvalue=True, orient=HORIZONTAL, state='normal') cls.parm_xVar.grid(row=1, column=2) cls.parm_xVar.set(0.5) cls.parm_yVar = Scale(parm_frame, label="y", from_=1, to=250, resolution=1, width=10, showvalue=True, orient=HORIZONTAL, state='normal') cls.parm_yVar.grid(row=1, column=3) cls.parm_yVar.set(50) parm_frame.grid(row=1) initButtonLayout = classmethod(initButtonLayout) def updateAlpha(cls, *args): # reflects the value of alpha produced by varying the 1/alpha slider cls.alphaVar.set(1 / cls.reciprocalAlphaVar.get()) updateAlpha = classmethod(updateAlpha) def displayConvergedValues(cls, iterations_to_convergence): # function to display the window of converged values (no interesting stuff here, just more Tk faffing) # window housekeeping try: for child in cls.converged_values.winfo_children(): child.destroy() except: cls.converged_values = Toplevel(Model.master) cls.converged_values.title("""Converged steady-state values (%d iterations)""" %iterations_to_convergence) # print out the converged values in a window col = 0 for s in ["Patch (i)", "Pi", "Ci", "Si"]: Label(cls.converged_values, text=s).grid(row=0, column=col) col += 1 for patch in Patch.all_patches: col = 0 for var in [patch.id + 1, round(patch.occupation_probability, 5), round(patch.colonization_probability, 5), round(patch.connectivity, 5)]: e = Entry(cls.converged_values, state='disabled', width=10) Label(e, text=str(var)[0:10], width=10).grid() e.grid(row=patch.id+1, column=col) col += 1 displayConvergedValues = classmethod(displayConvergedValues) ############################################################################################################# ### End of Model class definition ### ############################################################################################################# ############################################################################################################# ### The following classes are for displaying the various windows - no interesting calculations are done, ### ### just a lot of messing about with the Tk interface. ### ############################################################################################################# class PatchEditor: # This is the class for the Patch Editor window. editor = None frame = None target_patch = None def __init__(self): pass def editPatch(cls, patch): try: cls.editor.config() except: cls.editor = Toplevel(Model.master) if cls.target_patch != patch: # destroy any existing patch editors try: cls.frame.destroy() except: None cls.frame = Frame(cls.editor) cls.frame.pack() cls.target_patch = patch # window title cls.editor.title("Patch Number " + str(patch.id + 1)) # slider to vary ln(Area) - use a log in order to get sensible look and feel - varying area itself # linearly gives a non-sensitive scale, so this is better. Scale(cls.frame, label="ln(Area)", from_=0.1, to=5.5, resolution=1e-3, width=10, showvalue=True, orient=HORIZONTAL, state='normal', variable = cls.target_patch.logAreaVar, command=cls.target_patch.draw).grid() # reflects the actual area produced by varying the ln(Area) slider Label(cls.frame, text="Area (square km)").grid() Entry(cls.frame, width=5, state='disabled', textvariable = cls.target_patch.areaVar, validate='all', validatecommand=cls.target_patch.draw).grid() # x co-ord Scale(cls.frame, label="x co-ord (km)", from_= 0, to= CANVAS_SIZE / CANVAS_SCALE_FACTOR, showvalue=True, resolution=0.1, digits=3, width=10, orient=HORIZONTAL, state='normal', variable = cls.target_patch.xVar, command=cls.target_patch.draw).grid() # y co-ord Scale(cls.frame, label="y co-ord (km)", from_= 0, to= CANVAS_SIZE / CANVAS_SCALE_FACTOR, showvalue=True, resolution=0.1, digits=3, width=10, orient=HORIZONTAL, state='normal', variable = cls.target_patch.yVar, command=cls.target_patch.draw).grid() # delete this patch! Button(cls.frame, text="delete patch", command=cls.deletePatch).grid() editPatch = classmethod(editPatch) def deletePatch(cls): # fuction to handle the delete button being pressed in the Patch Editor window # call deletePatch in the Model class which will do the right stuff Model.deletePatch(cls.target_patch) # refocus the Patch Editor to the next highest patch number (if possible) try: cls.editPatch(Patch.all_patches[len(Patch.all_patches)-1]) except: cls.frame.destroy() cls.editor.destroy() deletePatch = classmethod(deletePatch) class DistanceVar(DoubleVar): # a helper class which encapsulates a distance entry in the matrix # this cleverness is required in order for the numbers to change automatically # as you move the patches around distvars = [] def __init__(self, i, j, *args): self.i = i self.j = j DoubleVar.__init__(self, args) DistanceVar.distvars.append(self) self.calculate() def __del__(self): DistanceVar.distvars.remove(self) def refresh(cls): for d in cls.distvars: d.calculate() refresh = classmethod(refresh) def calculate(self, *args): try: self.set(Patch.distance(self.i, self.j)) except: # might get an exception if this is an obsolete Entry we're updating... # in which case we ought to have been garbage collected. Hmmmm. pass class DistanceMatrix: # Class to display the matrix of distances between each patch. matrix = None # create widget for DistanceMatrix def __init__(self): pass def undraw(cls): try: cls.matrix.destroy() cls.matrix = None except: pass undraw = classmethod(undraw) def draw(cls): # draw distance matrix N = len(Patch.all_patches) if N < 2: # if less than 2 patches, no need for a matrix as there are no distances to show! DistanceMatrix.undraw() else: # this will get the window if we haven't already try: cls.matrix.title("Inter-patch distance matrix") except: cls.matrix = Toplevel(Model.master) cls.matrix.title("Inter-patch distance matrix") # destroy any existing content for child in cls.matrix.winfo_children(): child.destroy() # patch labels for patch in Patch.all_patches: if patch.id > 0: Label(cls.matrix, text=patch.id+1).grid(row=0, column=patch.id+1, sticky="N") if patch.id < len(Patch.all_patches)-1: Label(cls.matrix, text=patch.id+1).grid(row=patch.id+1, column=patch.id+1, sticky="E") # create new Entry objects for r in range(N): for c in range(r+1,N): Entry(cls.matrix, width=10, state='disabled', textvariable=DistanceVar(r,c)).grid(row=r+1, column=c+1) draw = classmethod(draw) ############################################################################################################# ### End of boring Tk related classes. ### ############################################################################################################# # Phew, finally we can actually invoke the GUI! Model.initButtonLayout() # Layout the main window. mainloop() # Tell Tk to take over the GUI. # End of program.