Asynchronous SQL

Spread the news

Asynchronous SQL can seem like a daunting task if you are not using an asynchronous library.  These libraries often add a lot of overhead to an existing application and force you to do things their way.  So let’s look at how to implement asynchronous SQL queries in your existing code from scratch.

If you like the idea of this, but don’t want to build the classes yourself, feel free to use the composer library I created while researching this topic.  You can get it via johncurt/async-mysql.

Why use asynchronous queries

Obviously using asynchronous queries is not going to be advantageous in every situation.  There are two reasons that you would want to have this capability

  1. Using an asynchronous approach for something like WebSockets where you can’t afford to wait for a query to go do other things.
  2. When a report is being tabulated in PHP from multiple queries that each might take a longer time – especially if these queries are on separate SQL servers. In this case, having two or more DB servers running queries at the same time can drastically improve performance. If the queries take 2 seconds, 2 seconds, and 3 seconds to run, the user would traditionally be waiting a full 7 seconds plus other overhead for the page/report to load.  By running these queries asynchronously the time is reduced to the “weakest link” – 3 seconds.

You may remember that I challenged my readers to come up with a way to asynchronously make DB calls when discussing WebSockets and Ratchet.  There are React and Amp libraries that easily integrate with those async systems, but they do require that you learn some things about their own system of Promises and methodologies. But implementing this concept yourself is not overly difficult regardless of which of these two reasons you have for doing it.

Overview

In order to perform and keep track of queries we will need a class that manages them.  Let’s call this the ConnectionManager.  It should accept Queries that come along with their own connections to the database.  If you will be using different databases for the multiple queries, then the connection needs to be part of the Query class.  However, to keep things simple for starters we can keep the connection data at the ConnectionManager level and have it make a new connection to the server for each query.  If you are going to do synchronous queries, this would create extra unneeded overhead, so it should really only be used when you are doing something Asynchronous.

Query

The Query class could be made in a variety of ways, but the basics are that it needs to contain the actual query that will be run as well as callbacks for both a successful run and a failure. Let’s instantiate it with these three things and let the rest of the methods of the class be managed by the connection manager.  In addition to these storage functionalities, it will need methods to get the query as well as methods the Connection Manager can call on success/failure.  Beyond that, we will need a good way to keep track of the query as well as the mysqli resource that is running it (so the Connection Manager can run the correct Query’s callback on success/failure).  This can be attained by generating a simple GUID. Lastly we’ll want some boolean flags to keep track of things like whether or not the Query has run, etc.

So, instantiating it will be relatively simple:

	 /**
	 * Query constructor.
	 *
	 * @param string        $query
	 * @param callable|null $success
	 * @param callable|null $failure
	 */
	public function __construct(string $query, $success=null, $failure=null) {
		$this->success = &$success;
		$this->failure = &$failure;
		$this->query = $query;
		$this->guid = $this->makeGUIDv4();
		$this->resultExists = false;
	}

	/**
	 * Generates a GUID for tracking links/queries together.
	 * @return string
	 */
	private function makeGUIDv4(){
		//this function always exists in php7, but other versions require the use of system-specific random data generation.
		$data = openssl_random_pseudo_bytes(16);
		$data[6] = chr(ord($data[6]) & 0x0f | 0x40); // set version to 0100
		$data[8] = chr(ord($data[8]) & 0x3f | 0x80); // set bits 6-7 to 10
		return vsprintf('%s%s-%s-%s-%s-%s%s%s', str_split(bin2hex($data), 4));
	}

 

All we will need to do is store the incoming settings and make the GUID. The rest of the methods are getters or are there to call the success/failure callbacks. You’ll notice that I included a default null on the callbacks. This is so that we can easily tell when there is no callback to run. Maybe the query is an INSERT or an UPDATE statement and we don’t need to do anything with them.

 

	 /**
	 * @param $result
	 */
	public function success($result){
		$this->resultExists = true;
		$this->result = $result;
		$this->done = true;
		if ($this->success !== null && is_callable($this->success)){
			/** @var callable $function */
			$function = &$this->success;
			$function($result);
		}
	}

	/**
	 * @param $error
	 */
	public function failure($error){
		$this->resultExists = false;
		$this->error = $error;
		$this->done = true;
		if ($this->failure !== null && is_callable($this->failure)){
			/** @var callable $function */
			$function = &$this->failure;
			$function($error);
		}
	}

These methods are simply pulling in the callback and running it, passing in the result or error message. They also set those boolean flags that may be useful for determining state. At this point we have a great class for storing the query and running callbacks on success/failure. Again, if you desire to use multiple DB servers, you will want to extend this Query class to have a way to create it’s own DB connection as well. You can find a full example of this Query class at https://github.com/johncurt/async-mysql/blob/master/src/Query.php.

Connection Manager

The connection manager should be able to manage a double array – one of the Query objects, and the other of the mysqli resources.  We will need to ensure the keys of both the connection and the Query are identical so we can easily match them up.  When instantiating we can pass in all the default connection information, and then whenever it gets a query attached it can create the corresponding connection and call the query asynchronously.

	 /**
	 * ConnectionManager constructor. Takes the same parameters as mysqli which are used to create
	 * connections for all the async queries to run in.
	 *
	 * @param null $host
	 * @param null $username
	 * @param null $passwd
	 * @param null $dbname
	 * @param null $port
	 * @param null $socket
	 */
	public function __construct($host=null,
	                               $username=null,
	                               $passwd=null,
	                               $dbname=null,
	                               $port=null,
	                               $socket=null){
		//set defaults:
		$host     = $host     ?? ini_get("mysqli.default_host");
		$username = $username ?? ini_get("mysqli.default_user");
		$passwd   = $passwd   ?? ini_get("mysqli.default_pw");
		$dbname   = $dbname   ?? '';
		$port     = $port     ?? ini_get("mysqli.default_port");
		$socket   = $socket   ?? ini_get("mysqli.default_socket");

		//test mysqli connection
		try {
			$mysqli = new \mysqli($host,
				$username,
				$passwd,
				$dbname,
				$port,
				$socket);
		} catch (\Exception $e){
			throw new \InvalidArgumentException('Unable to connect to this MySQL endpoint.');
		}
		if ($mysqli->connect_error){
			throw new \InvalidArgumentException('Unable to connect to this MySQL endpoint.');
		}

		//save the data for use when creating a link as queries are added
		$this->mysqliArgs = [
			$host,
			$username,
			$passwd,
			$dbname,
			$port,
			$socket,
		];
	}

	/**
	 * Adds the query to the queue and starts running it.
	 *
	 * @param \JohnCurt\AsyncMySQL\Query $query
	 * @return bool
	 */
	public function runQuery(Query &$query){
		$queryGuid = $query->getGuid();
		$this->queries[$queryGuid] = &$query;
		try {
			$this->links[ $queryGuid ] = new \mysqli(...$this->mysqliArgs); //turn array into args
			$this->links[ $queryGuid ]->query($query->getQuery(),MYSQLI_ASYNC);
		} catch (\Exception $e){
			//cleanup and return false for error.
			if (isset($this->links[$queryGuid])) unset($this->links[$queryGuid]);
			unset($this->queries[$queryGuid]);
			return false;
		}
		return true;
	}

You’ll notice the use of the PHP7-only null coalescing operator ?? which causes the variable assignment to take the variable on the left unless it is null – then the one on the right. It is important to note that these are equivalent to $host = ($host!==null) ? $host : ini_get("mysqli.default_host"); and would indeed need to be changed to this less succinct coding style if using PHP 5.

When running a query, the Connection Manager immediately sends it to the DB for processing, and uses the MYSQLI_ASYNC option to indicate that mysqlnd should open the socket asynchronously, returning the control back to the script immediately.  In the event of an error it cleans up and returns false to indicate a problem.

Many queries can be added, and each one will get its own connection and then sit waiting for us to check for results.  This checking process is where the PHP docs are a little lacking and there is much confusion in the community on how to properly check for available queries. To check for these queries that are ready to “reap” PHP offers the mysqli_poll function which takes three variables by reference. When running this function the $read, $error, and $reject arguments should all be set to the full array of connections. The function modifies the list, excluding links from the arrays that they don’t belong in. Personally, I find very little benefit from these (and would love for you to comment below if you have some ideas as to how to make use of them). The reality is that this poll function returns a count of arrays that are ready to be “reaped.” Since you have to reap rejected/errored resources as well as ones that are returning a valid result, I chose to just iterate over the full list of links if the poll function returns that there are some needing attention.

To actually try to pull in data from an asynchronous request, the mysqli_result object is retrieved using the function mysqli_reap_async_query. It returns the mysqli_result (which can return false on error) or false if it’s not ready to be reaped! See a problem here? There are two possibilities for receiving a false from the function, so we need to check for all of these possibilities by including checking for mysqli::error to make sure we have processed them correctly.

	 /**
	 * Basic "tick" function to check for finished queries and call their functions.
	 */
	public function reapAny(){
		$read = $error = $reject = $this->links;
		$count = mysqli_poll($read, $error, $reject, 0, 0);
		if ($count>0){
			//have to reap everything regardless, so if anything is available - just try to reap any/all.
			foreach ($this->links as $guid=>$link){
				$this->reap($guid, $link);
			}
		}
	}

	...

	/**
	 * @param string  $guid
	 * @param \mysqli $link
	 */
	private function reap(string $guid, \mysqli $link){
		$result = mysqli_reap_async_query($link);
		if ($result!==false){ //false when not ready - just ignore
			if (is_a($result, 'mysqli_result')){
				//we have a result - send it to the query success function
				$this->queries[$guid]->success($result);
			} else {
				$this->queries[$guid]->failure($link->error);
			}
			$this->removeByGuid($guid);
		} else if ($link->error<>''){
			$this->queries[$guid]->failure($link->error);
			$this->removeByGuid($guid);
		}
	}

	/**
	 * @param $guid
	 */
	private function removeByGuid($guid){
		$this->links[$guid]->close();
		unset($this->links[$guid]);
		unset($this->queries[$guid]);
	}

One additional method we will probably want is a blocking call that will wait for all the queries to finish running. This method is less useful when using an asynchronous framework, but very helpful if all you are doing it loading up some queries that can run simultaneously to save some time in the end.

	 /**
	 * Blocking call to wait (up to $timeoutInSeconds) to get all the current queries in the pool.
	 * Returns true if it succeeded in getting all queries, false on timeout.
	 *
	 * @param int $timeoutInSeconds
	 * @return bool timed out
	 */
	public function reapAll($timeoutInSeconds=0){
		$startTime = microtime(true);
		$curSecs = 0.0;
		while (count($this->links)>0 && ($timeoutInSeconds===0.0 || $timeoutInSeconds>$curSecs)) {
			$curSecs = microtime(true) - $startTime;
			$this->reapAny();
		}
		return (count($this->links)===0); // false if it timed out before reaping the last one!
	}

You can find a full example of this ConnectionManager at https://github.com/johncurt/async-mysql/blob/master/src/Query.php.

Use case

Now that we have these two Classes, we can line up some queries to be run and then reap the results at the end. If we pass things by reference and include variables in the scope of the callbacks, we can easily have callbacks that assign the results we need to variables back in the main scope.  Here’s a simple example:

$conn = new ConnectionManager('127.0.0.1','root','root','test',33060);
/** @var mysqli_result $result1 */
$result1 = null;
/** @var mysqli_result $result2 */
$resutl2 = null;
$success1 = function($result) use (&$result1) {$result1=$result;};
$success2 = function($result) use (&$result2) {$result2=$result;};
$query1 = new Query('SELECT 1 as num;',$success1);
$query2 = new Query('SELECT 2 as num;',$success2);
$conn->runQuery($query1);
$conn->runQuery($query2);
$conn->reapAll();
//results 1 and 2 should now have the respective resources!
if ($data = $result1->fetch_assoc()){
	//todo
}
if ($data = $result2->fetch_assoc()){
	//todo
}

Hopefully this article has shed some light on the ins and outs of this mysqli (with mysqlnd) functionality that is available in PHP as well as why you might want to use it. Please let me know what you think in the comments below! I’d love to hear your feedback and discuss this in more depth!


Spread the news

Leave a Reply

Your email address will not be published. Required fields are marked *