• Q-learning bot for Bubble Shooter

    TL;DR: Here is some resulting gameplay from this Dueling Double Deep Q-learning network based bot:

    For a while I wanted to explore Reinforcement learning, and this time I’ve done so by building a Q-Learning based bot for the Bubble Shooter game - an Adobe Flash based game running in the browser.

    Bubble shooter

    The game works as follows: the game board contains bubbles of 6 different colors. Shooting a connected group of same colored bubbles with 3 or more bubbles in it destroys all of them. The goal of the game is to destroy all bubbles on the board.

    I chose the game for simplicity of its UIs and 1-click gameplay, which was reasonably straightforward to automate.

    What is Q-Learning?

    You can learn about it in more detail by reading this Wikipedia article or watching this Siraj Raval’s explanatory video.

    But in short, in Q-Learning given a state the agent tries to evaluate how good it is to take an action. The “Q” (or “Quality”) value depends on the reward the agent expects to get by taking such action, as well as in any future states.

    E.g. maybe the agent shoots a bubble in one place and doesn’t destroy any bubbles - probably not a great move. Perhaps it shoots a bubble and manages to destroy 10 other bubbles - better! Or perhaps it does not destroy any bubbles on the first shot, but this enables the agent to destroy 20 bubbles with the second shot and win the game.

    The agent chooses its moves by calculating Q-values for every possible action, and choosing the one with the largest Q-value - the move it thinks will provide it with the most reward.

    For simple cases with few unique game states one can get away with a table of Q-values, but state of the art architectures switched to using Deep Neural Networks to learn and predict the Q-values.

    Dueling Double Q Network architecture

    The simplest case is a Deep Q Network - a single deep neural net that attempts to predict the Q-value of every step. But apparently it is prone to overestimation, resulting in the network thinking the moves are better than they really are.

    An improvement of that was Double Q Network (Arxiv paper: Deep Reinforcement Learning with Double Q-learning), which has two neural networks: an online one and a target one. The online network gets actively trained, while the target one doesn’t, so the target network “lags behind” in its Q-value evaluations. The key is that the online network gets to pick the action, while the target network performs the evaluation of the action. The target network gets updated every so often by copying the Neural Network weights over from the online network.

    A further improvement still was a Dueling Q network (Arxiv paper: Dueling Network Architectures for Deep Reinforcement Learning). The idea here is that it splits out the network into 2 streams: advantage and value, results of which are later combined into Q-values. This allows the network to learn which states are valuable without having to learn the effect of every action in the state, allowing it to generalize better.

    In the end I’ve ended up combining all of the techniques above into the following network:

    Model architecture

    It’s based on the architecture mentioned in one of the papers: several convolutional layers followed by several fully connected layers.

    Game states and rewards

    DeepMind were able to get their model successfully playing 2600 Atari games just by taking game screenshots as an input to the neural net, but for me the network simply failed to learn anything. Well, aside from successfully ending the game in the fewest number of steps.

    So I ended up using Computer Vision to parse the game board into a 35x15 matrix. 35 ball halves fit horizontally, and 15 balls fit vertically into a game board, so every cell in a matrix represents a half-ball horizontally and a full-ball vertically.

    What the current agent actually sees though are 4 different snapshots of the game board:

    • Positions of half-balls that match the current ball’s color
    • Positions of half-balls that don’t match the current ball’s color
    • Positions of half-balls that match the next ball’s color
    • Positions of half-balls that don’t match the next ball’s color

    Model architecture

    The idea was to start with at least learning the game for the current and the following ball colors to simplify the rules. I tried having layers for balls of every color, but wasn’t able to make it work yet.

    For rewards, I’ve settled on two: destroying more than one ball gives a reward of 1, every other action gives a reward of 0. Otherwise I found that Q-values gradually grow to infinity.

    For possible actions, the game board is 500x500 px in dimensions, meaning that there are 250000 possible actions for every state, that means lots of computations. To simplify, I’ve narrowed the action space down to 35 different actions, all oriented in a single horizontal line ~100px from the bottom of the game board.

    This makes some further shots impossible due to lack of accuracy, but seems to be good enough to start with.

    Prioritized experience replay

    The first option is to start with no replay at all. The agent could take a step and learn from it only once. But it is a poor option, as important experiences may be forgotten, and adjacent states correlate with each other.

    An improvement over it was to keep some recent experiences (e.g. 1 million experiences), and to train the neural network based on a randomly sampled batch of experiences. This allows to avoid correlation of adjacent states, but important experiences may still end up being revisited rarely.

    A further improvement over that is to keep a buffer of prioritized experiences. The experiences are prioritized based on the Q-value error (difference of predicted Q-value and actual Q-value), and training the neural network by sampling batches from different ranges of errors. That way more error-prone but rarely occurring states have a better chance of being retrained upon.

    I ended up using Jaromiru’s (based on this article) reference implementation of Prioritized Experience Replay (SumTree, Memory).

    An experience is a tuple of:

    • The current state
    • The action taken
    • The reward received
    • The next resulting state
    • Whether the episode is “done”

    Training in parallel

    Originally I’ve started with just a single instance of the game for training, but Machine Learning really shines with lots and lots of data (millions, billions of samples - the more the better), so at 2-5 seconds per step it just took too long to get any substantial amount of training done.

    So I used Selenium+Firefox and Python’s multiprocessing module to run 24 isolated game instances at once, allowing it to top out at about 4 steps/second. Even at the low number of instances it’s quite CPU-heavy and makes my poor Ryzen 2700x struggle, making the games run slower.

    You can find an explanation on how it was done in my last post, or find the code on Github.

    Results

    The most successful hyperparameters so far were:

    • Exploration probability (epsilon): 0.99
    • Minimum exploration probability: 0.1 and gradually reducing to 0.01 with every training session
    • Exploration probability decay every episode: 0.999 gradually reducing to 0.9 with every training session
    • Gamma: 0.9
    • Learning rate: 0.00025
    • Replay frequency: every 4 steps
    • Target network update frequency: every 1000 steps
    • Memory epsilon: 0.01
    • Memory alpha: 0.6
    • Experience memory buffer: 20000 experiences
    • Batch size: 32

    In total the model was trained about 6 different times for 200-300k steps - for ~24h each time.

    During training the model on average takes ~40 steps (~70 max), and is able to destroy at least 2 bubbles ~19 times in an episode (~39 max).

    The resulting gameplay is not terribly impressive, the agent is not taking some clear shots when there’s nothing better to do. Its knowledge also seems to be concentrated close to the starting state - it’s not doing well with empty spaces closer to the top or bottom of the board. But it is playing the game to some extent, and if the game’s RNG is favorable - it may even go pretty far.

    As a next step I might try different network architectures and hyperparameters, to speed things up pretrained offline on the bank of experiences acquired from this model’s gameplay.

    If you’re interested in the code, you can find it on Github.


  • Parallel and isolated game agents for Reinforcement Machine Learning

    Many Reinforcement Learning tutorials use the OpenAI Gym environment and its supported games to run on different algorithms. But what if you wanted to use Reinforcement Learning for your own environment? Usually you will no longer have ready-built APIs to interact with the environment, the environment itself may be slower than Gym’s games, making it difficult to take the millions of steps for weight updates necessary to train agent models.

    Lately I’ve been exploring RL for a Bubble Shooter game - an Adobe Flash based game running in the browser. Previously I would run the game in my primary browser, and use Pynput library to control my computer’s mouse and/or keyboard. This works for a single game instance, but is slow - one step taking 2-5 seconds. So running multiple game instances is necessary to speed up learning, but the original approach of controlling device’s primary mouse or keyboard does not scale that way easily.

    What helped to speed up the learning process was to use Selenium+Firefox to run the isolated game instances, allowing for straightforward controls for each instance without blocking the device’s primary controls, and running every game instance and the RL agent in separate processes.

    Selenium provides some key functionality necessary for this to work:

    • An easy way to take screenshots of DOM elements for the Vision module to use
    • An easy way to send mouse/keyboard inputs to a specific DOM element
    • Allows running Flash based applications

    Game instance isolation with Selenium and Firefox

    Selenium Python driver can be installed with pip install selenium. Firefox will also require a geckodriver package to be available in your $PATH: How to install geckodriver

    Flash is abandoned and modern browsers no longer include it. On Ubuntu browser-plugin-freshplayer-pepperflash needs to be installed to get flash running: sudo apt-get install browser-plugin-freshplayer-pepperflash.

    In the Selenium’s Firefox webdriver instance Flash can be enabled with a profile preference:

    from selenium import webdriver
    # ...
    profile = webdriver.FirefoxProfile()
    profile.set_preference("plugin.state.flash", 2)
    driver = webdriver.Firefox(profile)
    

    The webpage with the flash game can be opened with:

    driver.get('https://www.kongregate.com/games/Paulussss/bubbles-shooter')
    

    The correct DOM element can be found with:

    game_board_element = driver.find_element_by_tag_name('embed')
    

    Mouse movements and clicks are performed using ActionChains mechanism. It’s a convenient wrapper and will wait for actions to complete before returning:

    target_x, target_y = (50, 50) # some x, y coordinates
    action_chains = webdriver.common.action_chains.ActionChains(driver)
    action_chains.move_to_element_with_offset(game_board_element, target_x, target_y)
    action_chains.click()
    action_chains.perform()
    

    Game board screenshots can be taken with game_board_element.screenshot_as_png. This will return a Base64 encoded PNG image, which we can turn into an OpenCV compatible Numpy array:

    import numpy as np
    import cv2
    # ...
    base64_png = game_board_element.screenshot_as_png
    file_bytes = np.asarray(bytearray(base64_png), dtype=np.uint8)
    decoded_image = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
    rect = game_board_element.rect
    (x, y, w, h) = (int(rect['x']), int(rect['y']), int(rect['width']), int(rect['height']))
    cropped_image = decoded_image[y:y+h, x:x+w]
    
    import namedtuple
    Board = namedtuple('Board', ['x', 'y', 'w', 'h', 'screen'])
    
    board = Board(x, y, w, h, cropped_image)
    

    That finds the position of the game board, its dimensions and its contents - all that’s needed for further processing and interaction.

    Bubble shooter

    Parallelizing game instances with multiprocessing

    Using threads is an option, but because the goal is to run many game instances (24 in my case), I went with Processes instead to make use of all CPU cores.

    The distribution is as follows:

    • Main Supervisor, responsible for starting the agents and workers, managing queues
    • Agent process, responsible for running and training the RL model
    • N trainer/player processes, responsible for running Selenium and game instances.

    Inter-process communication will be performed through Queue instances, as inspired by Akka’s Actors.

    1. Agent worker

    from multiprocessing import Process, Queue
    
    class AgentProcess:
      def __init__(self, config, my_queue, worker_queues):
        self.agent = Agent(config) # create our agent
        self.my_queue = my_queue
        self.worker_queues = worker_queues
    
      def send_message(self, worker, message):
        self.worker_queues[worker].put_nowait(message)
    
      def work(self):
        try:
          while True:
            message = self.my_queue.get(block=True, timeout=60)
            if message['command'] == 'act':
              self.send_message(message['worker'], self.agent.act(message['state']))
            else:
              # Add other Agent actions
        except queue.Empty:
          return False
    
    def agent_worker(config, my_queue, worker_queues):
      agent = AgentProcess(config, my_queue, worker_queues)
      return agent.work()
    

    The idea is that the Agent process will keep waiting for messages from the Player workers with new states of the game board. Then it will ask the Agent to make a move, and will respond back to the correct Player worker about which action it should take.

    In this case the Agent has one queue to receive messages from all Player workers, but every Player will have their own queues to receive messages from the Agent.

    Since processes are isolated, the Agent process will be able to have its own Tensorflow model instance and related resources.

    2. Player worker

    The player worker is responsible for running the game environment, and just needs to ask the Agent which action it should make at certain points.

    class PlayerProcess:
      def __init__(self, agent_queue, my_queue, worker_name):
        self.agent_queue = agent_queue
        self.my_queue = my_queue
        self.worker_name = worker_name
    
      def send_to_agent(self, message={}):
        message['worker'] = self.worker_name
        self.agent_queue.put_nowait(message)
    
      def get_from_agent(self):
        return self.my_queue.get(block=True)
    
      def start(self):
        self.browser = SeleniumBrowser()
        self.browser.setup()
    
      def stop(self):
        self.browser.cleanup()
    
      def play(self, episodes, steps):
        for e in range(episodes):
          state = self.browser.get_game_board()
          for step in range(steps):
            self.send_to_agent({ 'command': 'act', 'state': state })
            action = self.get_from_agent()
    
            self.browser.move_to(action, 400)
    
    def player_worker(config, agent_queue, my_queue, my_name):
      player = PlayerProcess(agent_queue, my_queue, my_name)
      player.start()
      player.play(config['episodes'], config['steps'])
      player.stop()
    

    This PlayerProcess is simplified to show the main ideas.

    But in English all this does is gets the Selenium instance to get the game board state, asks the Agent to choose an action based upon the state, and asks the Selenium instance to perform the action.

    3. Training supervisor

    The training supervisor needs to start all of the relevant processes, queues. It also starts training, and releases all resources afterwards.

    class Supervisor:
      def __init__(self, config, total_players=2):
        self.total_players = total_players
        self.agent_queue = Queue()
        self.worker_queues = [Queue() for _ in range(total_players)]
        self.config = config
        self.agent = Process(target=agent_worker, args=(config,))
        self.workers = [Process(target=player_worker, args=(config, self.agent_queue, self.worker_queues[name]) for name in range(total_players)]
    
      def start(self):
        self.agent.start()
        for worker in self.workers:
          worker.start()
    
      def wait_to_finish(self):
        for worker in self.workers:
          worker.join()
        self.agent.terminate()
    

    And to actually run the supervisor, all player instances and start the playing process:

    config = #... - some config
    if __name__ == '__main__':
      supervisor = Supervisor(config, total_players=10)
      supervisor.start()
      supervisor.wait_to_finish()
    

    Once executed, this should start a bunch of browsers:

    Many browsers running

    I’m using this approach to run 24 game instances for my Q-learning process for Bubble Shooter. If you’d like to see it in action, you can find it on GitHub.


  • Solving a puzzle game with Z3 Theorem prover

    This post shows how Microsoft’s Z3 Theorem prover can be used to solve the Puzlogic puzzle game. We will replace the previous brute-force approach used in my puzlogic-bot with one that uses Z3.

    Over last few posts we walked through building up puzlogic-bot: building the basic solver, the vision component using OpenCV, hooking up mouse controls with Pynput. As I’m learning to use Z3, the Z3 puzzle solver implementation would be an interesting practical exercise.

    TL;DR: you can find the final solver code on Github.

    Z3 is a theorem prover made by Microsoft. The SMT (Satisfiability modulo theories) family of solvers build up on SAT (Boolean satisfiability problem) family of solvers, providing capabilities to deal with numbers, lists, bit vectors and so on, instead of just boolean formulas.

    Level 3

    Puzlogic is a number-based puzzle game. The goal of the game is to insert all given numbers into the board such that all rows and columns have unique numbers, and where a target sum is given - the row or column sums up to a given value.

    Basics of Z3

    “SAT/SMT by Example” by Dennis Yurichev is a great book to help beginners pick up Z3, and it was the inspiration for this post.

    At school we all worked through equation systems like:

    3x + 2y - z = 1
    2x - 2y + 4z = -2
    -x + 0.5y - z = 0
    

    Z3 can be thought of as a constraint solver. Given a set of constraints, it will try to find a set of variable values that satisfy the constraints. If it can find a valid set of values - it means the constraints (and thus the theorem) is satisfiable. If not - it is not satisfiable.

    The most basic example given in Yurichev’s book is:

    from z3 import *
    
    x = Real('x')
    y = Real('y')
    z = Real('z')
    
    s = Solver()
    s.add(3*x + 2*y - z == 1)
    s.add(2*x + 2*y + 4*z == -2)
    s.add(-x + 0.5y - z == 0)
    print(s.check())
    print(s.model())
    

    Executing it outputs:

    sat
    [z = -2, y = -2, x = 1]
    

    Here each equation can be considered as a constraint: a sum of variable values have to sum up to a certain value, satisfying the equation. Z3 is way more powerful than that, as evident by Z3py reference, but for the Puzlogic puzzle this is actually sufficient.

    What we need next is how to describe the given puzzle as a set of constraints for use in Z3.

    Solving Puzlogic puzzles

    Puzlogic - Level 6

    To quickly recap, the rules of the game are as follows:

    • All numbers on a given row or column must be unique
    • When a sum target is given for a given row or column, all cells in it must sum up to target sum
    • All purple pieces at the bottom of the game board must be used to fill the game board

    As established in the previous post, the game board is defined as a sparse matrix: a list of cells with x, y coordinates and a value. E.g.:

    board = [
      # (row, column, value)
      (0, 0, 0),
      (0, 3, 6),
      (1, 0, 2),
      (1, 1, 0),
      (1, 3, 0),
      (2, 0, 0),
      (2, 2, 5),
      (2, 3, 0),
      (3, 0, 0),
      (3, 3, 1)
    ]
    

    A value of zero is treated as an empty cell.

    Purple game pieces are expressed just as a list of numbers: [1, 2, 3, 4, 5, 6].

    While target sums can be expressed as (dimension, index, target_sum) tuples:

    target_sums = [
      # (dimension, index, target_sum
      (0, 1, 12), # second row sums up to 12
      (0, 2, 10), # third row sums up to 10
      (1, 0, 11) # first column sums up to 11
    ]
    

    Dimension of 0 marks a row, while 1 marks a column.

    For the implementation there are 5 constraints to cover:

    1. Limit values of pre-filled cells to the given values. Otherwise Z3 would assign them to some other values.
    2. Allow empty cells to get values only of available pieces.
    3. Require that there would be no duplicate values in any of the rows or columns.
    4. If there are any sum targets specified for given rows or columns, require them to be satisfied.
    5. Require that each purple game piece would be used only once.

    0. Z3 variables

    First, the board is extended by creating a Z3 variable for each cell to make it easier to apply constraints later on:

    def cell_name(self, row, column):
      return 'c_%d_%d' % (row, column)
    
    def solve(self):
      solver = Solver()
    
      # Create z3 variables for each cell
      extended_board = [(row, column, value, Int(self.cell_name(row, column))) for (row, column, value) in board]
    

    Both default values and the purple game pieces are integers, hence the Int variable type.

    1. Pre-filled variables

    So far Z3 would be able to assign any integer to any of the variables we created so far. We will constrain the cells to use the already specified value where it is specified.

    As mentioned before, in the board sparse matrix the value of 0 indicates an empty cell, so we initialize values for cells with values greater than 0:

    def is_cell_empty(self, value):
      return value == 0
    
    def set_prefilled_cell_values(self, board):
      return [cell == value for (_, _, value, cell) in board if not self.is_cell_empty(value)]
    

    The cell == value is going to generate an expression object of type z3.z3.BoolRef, as cell here is a variable we created above with type Int, and value is a constant.

    E.g. for the cell (0, 3, 6) the expression would be Int('c_0_3') == 6).

    2. Empty cells can gain a value from any of the pieces

    Having available pieces [1, 2, 3, 4, 5, 6], we want the following constraint for each of the empty cells: cell == 1 OR cell == 2 OR cell == 3 OR cell == 4 OR cell == 5 OR cell == 6

    def set_possible_target_cell_values(self, board, pieces):
      constraints = []
    
      for (row, column, value, cell) in board:
        if self.is_cell_empty(value):
          any_of_the_piece_values = [cell == piece for piece in pieces]
          constraints.append(Or(*any_of_the_piece_values))
    
      return constraints
    

    This returns a list of constraints in the format:

    constraints = [
      Or(c_0_0 == 1, c_0_0 == 2, c_0_0 == 3, c_0_0 == 4, c_0_0 == 5, c_0_0 == 6),
      Or(c_1_1 == 1, c_1_1 == 2, c_1_1 == 3, c_1_1 == 4, c_1_1 == 5, c_1_1 == 6),
      # ...
    ]
    

    You may notice that this allows cells on the same row or column to have a common value. This will be fixed by constraint 3. This also allows the same piece to be used in multiple cells, we’ll fix it with constraint 4.

    3. No duplicates in rows or columns

    def require_unique_row_and_column_cells(self, board):
      return [
        c1 != c2
          for ((x1, y1, _, c1), (x2, y2, _, c2)) in itertools.combinations(board, 2)
          if x1 == x2 or y1 == y2]
    

    itertools.combinations(), given a list of [A, B, C, ...] will output combinations [AB, AC, BC, ...] without repetitions. E.g. c_0_0 != c_0_3, c_0_0 != c_1_0, c_0_0 != c_2_0 and so on.

    We use it to make sure that no cells with a common row or column index have the same value. Instead of manually iterating over each cell combination, we could’ve used Distinct(args) expression for each row and column.

    4. Satisfying sum targets

    Any column or row may have a sum target that its pieces should sum up to.

      def match_sum_requirements(self, board, sum_requirements):
        constraints = []
        for (dimension, index, target_sum) in sum_requirements:
          relevant_cells = [cell[3] for cell in board if cell[dimension] == index]
          constraints.append(sum(relevant_cells) == target_sum)
    
        return constraints
    

    It’s a fairly straightforward sum of relevant cells. E.g. for the (0, 1, 12) sum target, the constraint expression would be Sum(c_1_0, c_1_1, c_1_3) == 12

    5. All pieces are used once

    Here that can be ensured by summing up pieces used in empty cells, and it should match the sum of pieces given originally. Just checking for unique values is insufficient, as on some levels there may be multiple pieces with the same value, e.g. [4, 5, 6, 4, 5, 6].

      def target_cells_use_all_available_pieces(self, board, pieces):
        empty_cells = [cell for (_, _, value, cell) in board if self.is_cell_empty(value)]
        return [sum(empty_cells) == sum(pieces)]
    

    Putting it all together

    Now that we have methods that generate constraint expressions, we can add them all to the solver and format the output to make it look more readable:

    constraints = \
      self.set_prefilled_cell_values(extended_board) + \
      self.set_possible_target_cell_values(extended_board, pieces) + \
      self.require_unique_row_and_column_cells(extended_board) + \
      self.match_sum_requirements(extended_board, sum_requirements) + \
      self.target_cells_use_all_available_pieces(extended_board, pieces)
    
    for constraint in constraints:
      solver.add(constraint)
    
    if solver.check() == sat:
      model = solver.model()
      return [
        (row, column, model[cell].as_long())
          for (row, column, value, cell) in extended_board
          if self.is_cell_empty(value)
      ]
    else:
      return False
    

    It’s worth noting that solver.check() may return either z3.sat or z3.unsat. If it is satisfiable, solver.model() will contain a list of values:

    # solver.model()
    [c_1_1 = 6,
     c_3_0 = 5,
     c_2_3 = 2,
     c_1_3 = 4,
     c_0_0 = 1,
     c_2_0 = 3,
     c_3_3 = 1,
     c_2_2 = 5,
     c_1_0 = 2,
     c_0_3 = 6]
    

    The values, however, are still Z3 objects, so e.g. an Int() object value may need to be converted into a primitive int via as_long() method.

    Once a model is solved, we also nicely format the return values in the form of:

    solution = [
      # (row, column, expected_piece_value)
      (0, 0, 1),
      (1, 1, 6),
      (1, 3, 4),
      (2, 0, 3),
      (2, 3, 2),
      (3, 0, 5)
    ]
    

    The steps, if done by the human, will solve the puzzle.

    The final set of constraints for level 6 was:

    [
      # 1. Pre-filled cell values
      c_0_3 == 6, c_1_0 == 2, c_2_2 == 5, c_3_3 == 1,
    
      # 2. Possible values for empty cells
      Or(c_0_0 == 1, c_0_0 == 2, c_0_0 == 3, c_0_0 == 4, c_0_0 == 5, c_0_0 == 6),
      Or(c_1_1 == 1, c_1_1 == 2, c_1_1 == 3, c_1_1 == 4, c_1_1 == 5, c_1_1 == 6),
      Or(c_1_3 == 1, c_1_3 == 2, c_1_3 == 3, c_1_3 == 4, c_1_3 == 5, c_1_3 == 6),
      Or(c_2_0 == 1, c_2_0 == 2, c_2_0 == 3, c_2_0 == 4, c_2_0 == 5, c_2_0 == 6),
      Or(c_2_3 == 1, c_2_3 == 2, c_2_3 == 3, c_2_3 == 4, c_2_3 == 5, c_2_3 == 6),
      Or(c_3_0 == 1, c_3_0 == 2, c_3_0 == 3, c_3_0 == 4, c_3_0 == 5, c_3_0 == 6),
    
      # 3. Unique values in rows and columns
      c_0_0 != c_0_3, c_0_0 != c_1_0, c_0_0 != c_2_0, c_0_0 != c_3_0,
      c_0_3 != c_1_3, c_0_3 != c_2_3, c_0_3 != c_3_3,
      c_1_0 != c_1_1, c_1_0 != c_1_3, c_1_0 != c_2_0, c_1_0 != c_3_0,
      c_1_1 != c_1_3,
      c_1_3 != c_2_3, c_1_3 != c_3_3,
      c_2_0 != c_2_2, c_2_0 != c_2_3, c_2_0 != c_3_0,
      c_2_2 != c_2_3,
      c_2_3 != c_3_3,
      c_3_0 != c_3_3,
    
      # 4. Row/column sum targets
      0 + c_1_0 + c_1_1 + c_1_3 == 12,
      0 + c_2_0 + c_2_2 + c_2_3 == 10,
      0 + c_0_0 + c_1_0 + c_2_0 + c_3_0 == 11,
    
      # 5. Ensuring that all pieces have been used once
      0 + c_0_0 + c_1_1 + c_1_3 + c_2_0 + c_2_3 + c_3_0 == 21
    ]
    

    Final thoughts

    The puzlogic-bot project is able to use the Z3 based solver for first few levels, but that’s only because the Vision component is not able to read the row/column target sums correctly. As we have seen in the example, the Z3-based solver has no issues dealing with target sums in further levels.

    I found it rather surprising just how approachable Z3 was for a newbie. The puzzle problem perhaps was primitive, but the only difficulty was to not get lost in the constraint definitions the puzzle required.

    You can find the final code of the solver on Github. If you are curious how puzlogic-bot was built, I have a series of posts on it too.

    How do you use or would use constraint solvers?


  • Debugging TLA+ specifications with state dumps

    TLA+ (Temporal Logic of Actions) is a formal specification language, used for specifying and checking systems and algorithms for corectness. As I’m further exploring it (see my last post about modelling subscription throttling), I’m ocasionally running into situations where I think there may be an issue with the current specification as they unexpectedly do not terminate, but without a clear reason.

    In programming languages it is common to fire up a debugger and step through the program execution, inspecting instruction flow and variable values. With TLC there is no clear way to do so, at least I was unable to find any. TLC will only provide an error trace for errors it does find, however if TLC execution does not terminate this is of little use.

    In TLC Command-line options I found the -dump <file> option, which dumps out every reached state into the specified file. There is no equivalent GUI option in “TLA+ Toolbox’s” TLC options, so it has to be specified as a TLC command line parameter in Model > Advanced Options > TLC Options > TLC command line parameters.

    TLC command line parameters

    The dump file itself consists of plaintext state values, so it is certainly human readable. For a tautology checking spec when run with the -dump states.txt:

    VARIABLES P, Q
    
    F1(A, B) == A => B
    F2(A, B) == ~A \/ B
    
    ValuesEquivalent == F1(P, Q) <=> F2(P, Q)
    
    Init ==
        /\ P \in BOOLEAN
        /\ Q \in BOOLEAN
        
    Next ==
        /\ P' \in BOOLEAN
        /\ Q' \in BOOLEAN
        
    Spec ==
        Init /\ [][Next]_<<P, Q>>
    

    The dump file in <SPEC_PATH>/<SPEC_NAME>.toolbox/<MODEL_NAME>/states.txt.dump contains:

    State 1:
    /\ P = FALSE
    /\ Q = FALSE
    
    State 2:
    /\ P = FALSE
    /\ Q = TRUE
    
    State 3:
    /\ P = TRUE
    /\ Q = FALSE
    
    State 4:
    /\ P = TRUE
    /\ Q = TRUE
    

    It’s a low-tech approach, it still is difficult to follow execution in cases with many states, but it has helped me get a better grasp on which states TLC is exploring and clarify issue suspicions. One thing to note is that depending on the size of a state and the number of states, the filesize may quickly grow into gigabytes range, so in certain cases it may be impractical to dump all distinct states, requiring early termination of TLC.

    This is a very basic method, but if you have better ways of debugging specs themselves - please do let me know!


  • Adding controls and bot logic - Part 3 of Solving Puzlogic puzzle game with Python and OpenCV

    In the the second part of the series we added the Vision component for the Puzlogic puzzle solving bot.

    In this post we will add a mouse controller as well as handle basic bot logic to get the bot actually solve some of the beginner puzzles.

    Planning

    For controls, as before in the Burrito Bison bot, here we only need the mouse. So I’ll reuse the component, which uses pynput` for mouse controls.

    For the bot logic, we’ll need to use the Solver, Vision and Controls components, making use of all 3 to solve a given puzzle. In this case the bot will rely on the human to set up the game level, handle game UIs, while bot itself will be responsible only for finding the right solution and moving game pieces to the target cells.

    Implementation

    Mouse Controller

    As mentioned before, we’ll use Pynput for mouse controls. We start with the basic outline:

    import time
    from pynput.mouse import Button, Controller as MouseController
    
    class Controller:
        def __init__(self):
            self.mouse = MouseController()
    
        def move_mouse(self, x, y):
            # ...
    
        def left_mouse_drag(self, from, to):
            # ...
    

    For move_mouse(x, y), it needs to move the mouse to the given coordinate on screen.

    Pynput would be able to move the mouse in a single frame just by changing mouse.position attribute, but for me that caused problems in the past, as the game would simply not keep up and may handle such mouse movements unpredictably (e.g. not registering mouse movements at all, or moving it only partially). And in a way that makes sense. Human mouse movements normally take several hundred milliseconds, not under 1ms.

    To mimic such gestures I’ve added a way to smooth out the mouse movement over some time period, e.g. 100ms, by taking a step every so often (e.g. every 2.5ms in 40 steps).

    def move_mouse(self, x, y):
        def set_mouse_position(x, y):
            self.mouse.position = (int(x), int(y))
        def smooth_move_mouse(from_x, from_y, to_x, to_y, speed=0.1):
            steps = 40
            sleep_per_step = speed // steps
            x_delta = (to_x - from_x) / steps
            y_delta = (to_y - from_y) / steps
            for step in range(steps):
                new_x = x_delta * (step + 1) + from_x
                new_y = y_delta * (step + 1) + from_y
                set_mouse_position(new_x, new_y)
                time.sleep(sleep_per_step)
        return smooth_move_mouse(
            self.mouse.position[0],
            self.mouse.position[1],
            x,
            y
        )
    

    The second necessary operation is mouse dragging, which means pushing the left mouse button down, moving the mouse to the right position and releasing the left mouse button. Again, all those steps can be programatically performed in under 1ms, but not all games can keep up, so we’ll spread out the gesture actions over roughly a second.

    def left_mouse_drag(self, start, end):
        delay = 0.2
        self.move_mouse(*start)
        time.sleep(delay)
        self.mouse.press(Button.left)
        time.sleep(delay)
        self.move_mouse(*end)
        time.sleep(delay)
        self.mouse.release(Button.left)
        time.sleep(delay)
    

    And that should be sufficient for the bot to reasonably interact with the game.

    Bot logic

    The bot component will need to communicate between the other 3 component. For bot logic there are 2 necessary steps:

    1. Solver component needs to get information from Vision about available cells, their contents, available pieces in order for Solver to provide a solution.
    2. Take the solution from Solver, and map its results into real-life actions, by moving the mouse via Controls to the right positions.

    We start by defining the structure:

    class Bot:
        """ Needs to map vision coordinates to solver coordinates """
    
        def __init__(self, vision, controls, solver):
            self.vision = vision
            self.controls = controls
            self.solver = solver
    

    Then we need to a way feed information to Solver. But we can’t just feed vision information straight into the solver (at least not in the current setup), as it both work with slightly differently structured data. So we first will need to map vision information into structures that Solver can understand.

    def get_moves(self):
        return self.solver.solve(self.get_board(), self.get_pieces(), self.get_constraints())
    

    self.vision.get_cells() returns cell information in a list of cells of format Cell(x, y, width, height, content), but Solver expects a list of cells in format of (row, column, piece):

    def get_board(self):
        """ Prepares vision cells for solver """
        cells = self.vision.get_cells()
    
        return list(map(lambda c: (c.x, c.y, c.content), cells))
    

    self.get_pieces() expects a list of integers representing available pieces. However, vision information returns Cell(x, y, width, height, content). So we need to map that as well:

    def get_pieces(self):
        """ Prepares vision pieces for solver """
        return list(map(lambda p: p.content, self.vision.get_pieces()))
    

    self.get_constraints() currently is not implemented in Vision component, and instead returns an empty list []. We can just pass that along to the Solver unchanged for now, but will likely have to change it once constraints are implemented.

    def get_constraints(self):
        """ Prepares vision constraints for solver """
        return self.vision.get_constraints()
    

    Now that we have the solution in the form [(target_row, target_column, target_piece_value), ...], we need to map that back into something that could work for the graphical representation. We already treat x and y of each cell as the “row” and “column”, which works because all cells are arranged in a grid anyway. Now we only need to find which available pieces to take from which cell, and move those to respective target cells.

    def do_moves(self):
        moves = self.get_moves()
        board = self.vision.get_game_board()
        available_pieces = self.vision.get_pieces()
    
        def get_available_piece(piece, pieces):
            target = list(filter(lambda p: p.content == piece, pieces))[0]
            remaining_pieces = list(filter(lambda p: p != target, pieces))
            return (target, remaining_pieces)
    
        for (to_x, to_y, required_piece) in moves:
            (piece, available_pieces) = get_available_piece(required_piece, available_pieces)
    
            # Offset of the game screen within a window + offset of the cell + center of the cell
            move_from = (board.x + piece.x + piece.w/2, board.y + piece.y + piece.h/2)
            move_to = (board.x + to_x + piece.w/2, board.y + to_y + piece.h/2)
            print('Moving', move_from, move_to)
    
            self.controls.left_mouse_drag(
                move_from,
                move_to
            )
    

    get_available_piece takes the first one from the vision’s set of pieces, and uses that as a source for the solution.

    As for handling coordinates, one thing to not forget is that coordinates point to the top left corner of the corner, and are not always absolute in relation to the OS screen. E.g. a piece’s x coordinate is relative to the game screen’s x coordinate, so to find its absolute coordinate we need to sum the two: absolute_x = board.x + piece.x.

    Top left corner of the piece is also not always usable - the game may not respond. However, if we target the center of the cell - that works reliably. So we offset the absolute coordinate with half of the cell’s width or height: absolute_x = board.x + piece.x + piece.width.

    Finally running the bot!

    Now we have all the tools for the bot to solve basic puzzles of Puzlogic. The last remaining piece is to build a run.py script putting it all together:

    from puzbot.vision import ScreenshotSource, Vision
    from puzbot.bot import Bot
    from puzbot.solver import Solver
    from puzbot.controls import Controller
    
    source = ScreenshotSource()
    vision = Vision(source)
    solver = Solver()
    controller = Controller()
    bot = Bot(vision, controller, solver)
    
    print('Checking out the game board')
    bot.refresh()
    
    print('Game board:', vision.get_cells())
    print('Game pieces:', vision.get_pieces())
    
    print('Calculating the solution')
    print('Suggested solution:', bot.get_moves())
    print('Performing the solution')
    bot.do_moves()
    

    Now we can run the bot with python run.py, making sure that the game window is also visible on the screen. Here’s a demo video:

    You can see that the bot needs human help to get through the interfaces, and it can only solve puzzles by itself. Even then, it is able to resolve the first 4 basic puzzles, and just by chance is able to solve the 5th one, as it contains sum constraints - right now the bot cannot handle sum constraints in vision. So it does fail on the 6th puzzle.

    Full code mentioned in this post can be found on Github: flakas/puzlogic-bot - bot.py, flakas/puzlogic-bot - controls.py.

    If you are interested in the full project, you can also find it on Github: flakas/puzlogic-bot.