Page Menu
Home
In-Portal Phabricator
Search
Configure Global Search
Log In
Files
F1044424
db_load_balancer.php
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, Jun 25, 11:43 AM
Size
15 KB
Mime Type
text/x-php
Expires
Fri, Jun 27, 11:43 AM (7 h, 57 m)
Engine
blob
Format
Raw Data
Handle
675546
Attached To
rINP In-Portal
db_load_balancer.php
View Options
<?php
/**
* @version $Id: db_load_balancer.php 14934 2011-12-28 18:06:28Z alex $
* @package In-Portal
* @copyright Copyright (C) 1997 - 2011 Intechnic. All rights reserved.
* @license GNU/GPL
* In-Portal is Open Source software.
* This means that this software may have been modified pursuant
* the GNU General Public License, and as distributed it includes
* or is derivative of works licensed under the GNU General Public License
* or other free or open source software licenses.
* See http://www.in-portal.org/license for copyright notices and details.
*/
defined
(
'FULL_PATH'
)
or
die
(
'restricted access!'
);
class
kDBLoadBalancer
extends
kBase
{
/**
* Current database type
*
* @var string
* @access protected
*/
protected
$dbType
=
'mysql'
;
/**
* Function to handle sql errors
*
* @var string
* @access public
*/
public
$errorHandler
=
''
;
/**
* Database connection settings
*
* @var Array
* @access protected
*/
protected
$servers
=
Array
();
/**
* Load of each slave server, given
*
* @var Array
* @access protected
*/
protected
$serverLoads
=
Array
();
/**
* Caches replication lag of servers
*
* @var Array
* @access protected
*/
protected
$serverLagTimes
=
Array
();
/**
* Connection references to opened connections
*
* @var Array
* @access protected
*/
protected
$connections
=
Array
();
/**
* Index of last user slave connection
*
* @var int
* @access protected
*/
protected
$slaveIndex
=
false
;
/**
* Index of the server, that was used last
*
* @var int
* @access protected
*/
protected
$lastUsedIndex
=
0
;
/**
* Consider slave down if it isn't responding for N milliseconds
*
* @var int
* @access protected
*/
protected
$DBClusterTimeout
=
10
;
/**
* Scale load balancer polling time so that under overload conditions, the database server
* receives a SHOW STATUS query at an average interval of this many microseconds
*
* @var int
* @access protected
*/
protected
$DBAvgStatusPoll
=
2000
;
/**
* Indicates, that next database query could be cached, when memory caching is enabled
*
* @var bool
* @access public
*/
public
$nextQueryCachable
=
false
;
/**
* Indicates, that next query should be sent to maser database
*
* @var bool
*/
public
$nextQueryFromMaster
=
false
;
/**
* Creates new instance of load balancer class
*
* @param string $dbType
* @param Array|string $errorHandler
*/
function
__construct
(
$dbType
,
$errorHandler
=
''
)
{
parent
::
__construct
();
$this
->
dbType
=
$dbType
;
$this
->
errorHandler
=
$errorHandler
;
$this
->
DBClusterTimeout
*=
1e6
;
// convert to milliseconds
}
/**
* Setups load balancer according given configuration
*
* @param Array $config
* @return void
* @access public
*/
public
function
setup
(
$config
)
{
$this
->
servers
=
Array
();
$this
->
servers
[
0
]
=
Array
(
'DBHost'
=>
$config
[
'Database'
][
'DBHost'
],
'DBUser'
=>
$config
[
'Database'
][
'DBUser'
],
'DBUserPassword'
=>
$config
[
'Database'
][
'DBUserPassword'
],
'DBName'
=>
$config
[
'Database'
][
'DBName'
],
'DBLoad'
=>
0
,
);
if
(
isset
(
$config
[
'Databases'
])
)
{
$this
->
servers
=
array_merge
(
$this
->
servers
,
$config
[
'Databases'
]);
}
foreach
(
$this
->
servers
as
$server_index
=>
$server_setting
)
{
$this
->
serverLoads
[
$server_index
]
=
$server_setting
[
'DBLoad'
];
}
}
/**
* Returns connection index to master database
*
* @return int
* @access protected
*/
protected
function
getMasterIndex
()
{
return
0
;
}
/**
* Returns connection index to slave database. This takes into account load ratios and lag times.
* Side effect: opens connections to databases
*
* @return int
* @access protected
*/
protected
function
getSlaveIndex
()
{
if
(
count
(
$this
->
servers
)
==
1
||
$this
->
Application
->
isAdmin
)
{
// skip the load balancing if there's only one server OR in admin console
return
0
;
}
elseif
(
$this
->
slaveIndex
!==
false
)
{
// shortcut if generic reader exists already
return
$this
->
slaveIndex
;
}
$total_elapsed
=
0
;
$non_error_loads
=
$this
->
serverLoads
;
$i
=
$found
=
$lagged_slave_mode
=
false
;
// first try quickly looking through the available servers for a server that meets our criteria
do
{
$current_loads
=
$non_error_loads
;
$overloaded_servers
=
$total_threads_connected
=
0
;
while
(
$current_loads
)
{
if
(
$lagged_slave_mode
)
{
// when all slave servers are too lagged, then ignore lag and pick random server
$i
=
$this
->
pickRandom
(
$current_loads
);
}
else
{
$i
=
$this
->
getRandomNonLagged
(
$current_loads
);
if
(
$i
===
false
&&
$current_loads
)
{
// all slaves lagged -> pick random lagged slave then
$lagged_slave_mode
=
true
;
$i
=
$this
->
pickRandom
(
$current_loads
);
}
}
if
(
$i
===
false
)
{
// all slaves are down -> use master as a slave
$this
->
slaveIndex
=
$this
->
getMasterIndex
();
return
$this
->
slaveIndex
;
}
$conn
=&
$this
->
openConnection
(
$i
);
if
(
!
$conn
)
{
unset
(
$non_error_loads
[
$i
],
$current_loads
[
$i
]);
continue
;
}
// Perform post-connection backoff
$threshold
=
isset
(
$this
->
servers
[
$i
][
'DBMaxThreads'
])
?
$this
->
servers
[
$i
][
'DBMaxThreads'
]
:
false
;
$backoff
=
$this
->
postConnectionBackoff
(
$conn
,
$threshold
);
if
(
$backoff
)
{
// post-connection overload, don't use this server for now
$total_threads_connected
+=
$backoff
;
$overloaded_servers
++;
unset
(
$current_loads
[
$i
]
);
}
else
{
// return this server
break
2
;
}
}
// no server found yet
$i
=
false
;
// if all servers were down, quit now
if
(
!
$non_error_loads
)
{
break
;
}
// back off for a while
// scale the sleep time by the number of connected threads, to produce a roughly constant global poll rate
$avg_threads
=
$total_threads_connected
/
$overloaded_servers
;
usleep
(
$this
->
DBAvgStatusPoll
*
$avg_threads
);
$total_elapsed
+=
$this
->
DBAvgStatusPoll
*
$avg_threads
;
}
while
(
$total_elapsed
<
$this
->
DBClusterTimeout
);
if
(
$i
!==
false
)
{
// slave connection successful
if
(
$this
->
slaveIndex
<=
0
&&
$this
->
serverLoads
[
$i
]
>
0
&&
$i
!==
false
)
{
$this
->
slaveIndex
=
$i
;
}
}
return
$i
;
}
/**
* Returns random non-lagged server
*
* @param Array $loads
* @return int
* @access protected
*/
protected
function
getRandomNonLagged
(
$loads
)
{
// unset excessively lagged servers
$lags
=
$this
->
getLagTimes
();
foreach
(
$lags
as
$i
=>
$lag
)
{
if
(
$i
!=
0
&&
isset
(
$this
->
servers
[
$i
][
'DBMaxLag'
])
)
{
if
(
$lag
===
false
)
{
unset
(
$loads
[
$i
]
);
// server is not replicating
}
elseif
(
$lag
>
$this
->
servers
[
$i
][
'DBMaxLag'
]
)
{
unset
(
$loads
[
$i
]
);
// server is excessively lagged
}
}
}
// find out if all the slaves with non-zero load are lagged
if
(
!
$loads
||
array_sum
(
$loads
)
==
0
)
{
return
false
;
}
// return a random representative of the remainder
return
$this
->
pickRandom
(
$loads
);
}
/**
* Select an element from an array of non-normalised probabilities
*
* @param Array $weights
* @return int
* @access protected
*/
protected
function
pickRandom
(
$weights
)
{
if
(
!
is_array
(
$weights
)
||
!
$weights
)
{
return
false
;
}
$sum
=
array_sum
(
$weights
);
if
(
$sum
==
0
)
{
return
false
;
}
$max
=
mt_getrandmax
();
$rand
=
mt_rand
(
0
,
$max
)
/
$max
*
$sum
;
$index
=
$sum
=
0
;
foreach
(
$weights
as
$index
=>
$weight
)
{
$sum
+=
$weight
;
if
(
$sum
>=
$rand
)
{
break
;
}
}
return
$index
;
}
/**
* Get lag time for each server
* Results are cached for a short time in memcached, and indefinitely in the process cache
*
* @return Array
* @access protected
*/
protected
function
getLagTimes
()
{
if
(
$this
->
serverLagTimes
)
{
return
$this
->
serverLagTimes
;
}
$expiry
=
5
;
$request_rate
=
10
;
$cache_key
=
'lag_times:'
.
$this
->
servers
[
0
][
'DBHost'
];
$times
=
$this
->
Application
->
getCache
(
$cache_key
);
if
(
$times
)
{
// randomly recache with probability rising over $expiry
$elapsed
=
adodb_mktime
()
-
$times
[
'timestamp'
];
$chance
=
max
(
0
,
(
$expiry
-
$elapsed
)
*
$request_rate
);
if
(
mt_rand
(
0
,
$chance
)
!=
0
)
{
unset
(
$times
[
'timestamp'
]
);
$this
->
serverLagTimes
=
$times
;
return
$times
;
}
}
// cache key missing or expired
$times
=
Array
();
foreach
(
$this
->
servers
as
$index
=>
$server
)
{
if
(
$index
==
0
)
{
$times
[
$index
]
=
0
;
// master
}
else
{
$conn
=&
$this
->
openConnection
(
$index
);
if
(
$conn
!==
false
)
{
$times
[
$index
]
=
$conn
->
getSlaveLag
();
}
}
}
// add a timestamp key so we know when it was cached
$times
[
'timestamp'
]
=
adodb_mktime
();
$this
->
Application
->
setCache
(
$cache_key
,
$times
,
$expiry
);
// but don't give the timestamp to the caller
unset
(
$times
[
'timestamp'
]);
$this
->
serverLagTimes
=
$times
;
return
$this
->
serverLagTimes
;
}
/**
* Determines whatever server should not be used, even, when connection was made
*
* @param kDBConnection $conn
* @param int $threshold
* @return int
* @access protected
*/
protected
function
postConnectionBackoff
(&
$conn
,
$threshold
)
{
if
(
!
$threshold
)
{
return
0
;
}
$status
=
$conn
->
getStatus
(
'Thread%'
);
return
$status
[
'Threads_running'
]
>
$threshold
?
$status
[
'Threads_connected'
]
:
0
;
}
/**
* Open a connection to the server given by the specified index
* Index must be an actual index into the array.
* If the server is already open, returns it.
*
* On error, returns false.
*
* @param integer $i Server index
* @return kDBConnection|false
* @access protected
*/
protected
function
&
openConnection
(
$i
)
{
if
(
isset
(
$this
->
connections
[
$i
])
)
{
$conn
=&
$this
->
connections
[
$i
];
}
else
{
$server
=
$this
->
servers
[
$i
];
$server
[
'serverIndex'
]
=
$i
;
$conn
=&
$this
->
reallyOpenConnection
(
$server
,
$i
==
$this
->
getMasterIndex
());
if
(
$conn
->
connectionOpened
)
{
$this
->
connections
[
$i
]
=&
$conn
;
$this
->
lastUsedIndex
=
$i
;
}
else
{
$conn
=
false
;
}
}
if
(
$this
->
nextQueryCachable
&&
is_object
(
$conn
)
)
{
$conn
->
nextQueryCachable
=
true
;
$this
->
nextQueryCachable
=
false
;
}
return
$conn
;
}
/**
* Really opens a connection.
* Returns a database object whether or not the connection was successful.
*
* @param Array $server
* @param bool $is_master
* @return kDBConnection
*/
protected
function
&
reallyOpenConnection
(
$server
,
$is_master
)
{
$db
=&
$this
->
Application
->
makeClass
(
'kDBConnection'
,
Array
(
$this
->
dbType
,
$this
->
errorHandler
,
$server
[
'serverIndex'
])
);
/* @var $db kDBConnection */
$db
->
debugMode
=
$this
->
Application
->
isDebugMode
();
$db
->
Connect
(
$server
[
'DBHost'
],
$server
[
'DBUser'
],
$server
[
'DBUserPassword'
],
$this
->
servers
[
0
][
'DBName'
],
true
,
!
$is_master
);
return
$db
;
}
/**
* Returns first field of first line of recordset if query ok or false otherwise
*
* @param string $sql
* @param int $offset
* @return string
* @access public
*/
public
function
GetOne
(
$sql
,
$offset
=
0
)
{
$conn
=&
$this
->
chooseConnection
(
$sql
);
return
$conn
->
GetOne
(
$sql
,
$offset
);
}
/**
* Returns first row of recordset if query ok, false otherwise
*
* @param string $sql
* @param int $offset
* @return Array
* @access public
*/
public
function
GetRow
(
$sql
,
$offset
=
0
)
{
$conn
=&
$this
->
chooseConnection
(
$sql
);
return
$conn
->
GetRow
(
$sql
,
$offset
);
}
/**
* Returns 1st column of recordset as one-dimensional array or false otherwise
* Optional parameter $key_field can be used to set field name to be used as resulting array key
*
* @param string $sql
* @param string $key_field
* @return Array
* @access public
*/
public
function
GetCol
(
$sql
,
$key_field
=
null
)
{
$conn
=&
$this
->
chooseConnection
(
$sql
);
return
$conn
->
GetCol
(
$sql
,
$key_field
);
}
/**
* Queries db with $sql query supplied and returns rows selected if any, false
* otherwise. Optional parameter $key_field allows to set one of the query fields
* value as key in string array.
*
* @param string $sql
* @param string $key_field
* @param bool $no_debug
* @return Array
* @access public
*/
public
function
Query
(
$sql
,
$key_field
=
null
,
$no_debug
=
false
)
{
$conn
=&
$this
->
chooseConnection
(
$sql
);
return
$conn
->
Query
(
$sql
,
$key_field
,
$no_debug
);
}
/**
* Performs sql query, that will change database content
*
* @param string $sql
* @return bool
* @access public
*/
public
function
ChangeQuery
(
$sql
)
{
$conn
=&
$this
->
chooseConnection
(
$sql
);
return
$conn
->
ChangeQuery
(
$sql
);
}
/**
* If it's a string, adds quotes and backslashes (only work since PHP 4.3.0)
* Otherwise returns as-is
*
* @param mixed $string
* @return string
* @access public
*/
public
function
qstr
(
$string
)
{
$conn
=&
$this
->
openConnection
(
$this
->
lastUsedIndex
);
return
$conn
->
qstr
(
$string
);
}
/**
* Calls "qstr" function for each given array element
*
* @param Array $array
* @param string $function
* @return Array
*/
public
function
qstrArray
(
$array
,
$function
=
'qstr'
)
{
$conn
=&
$this
->
openConnection
(
$this
->
lastUsedIndex
);
return
$conn
->
qstrArray
(
$array
,
$function
);
}
/**
* Performs insert of given data (useful with small number of queries)
* or stores it to perform multiple insert later (useful with large number of queries)
*
* @param Array $fields_hash
* @param string $table
* @param string $type
* @param bool $insert_now
* @return bool
* @access public
*/
public
function
doInsert
(
$fields_hash
,
$table
,
$type
=
'INSERT'
,
$insert_now
=
true
)
{
$conn
=&
$this
->
openConnection
(
$this
->
getMasterIndex
()
);
return
$conn
->
doInsert
(
$fields_hash
,
$table
,
$type
,
$insert_now
);
}
/**
* Update given field values to given record using $key_clause
*
* @param Array $fields_hash
* @param string $table
* @param string $key_clause
* @return bool
* @access public
*/
public
function
doUpdate
(
$fields_hash
,
$table
,
$key_clause
)
{
$conn
=&
$this
->
openConnection
(
$this
->
getMasterIndex
()
);
return
$conn
->
doUpdate
(
$fields_hash
,
$table
,
$key_clause
);
}
/**
* When undefined method is called, then send it directly to last used slave server connection
*
* @param string $name
* @param Array $arguments
* @return mixed
* @access public
*/
public
function
__call
(
$name
,
$arguments
)
{
$conn
=&
$this
->
openConnection
(
$this
->
lastUsedIndex
);
return
call_user_func_array
(
Array
(&
$conn
,
$name
),
$arguments
);
}
/**
* Returns appropriate connection based on sql
*
* @param string $sql
* @return kDBConnection
* @access protected
*/
protected
function
&
chooseConnection
(
$sql
)
{
if
(
$this
->
nextQueryFromMaster
)
{
$this
->
nextQueryFromMaster
=
false
;
$index
=
$this
->
getMasterIndex
();
}
else
{
$sid
=
isset
(
$this
->
Application
->
Session
)
?
$this
->
Application
->
GetSID
()
:
'9999999999999999999999'
;
if
(
preg_match
(
'/(^[
\t\r\n
]*(ALTER|CREATE|DROP|RENAME|DELETE|DO|INSERT|LOAD|REPLACE|TRUNCATE|UPDATE))|ses_'
.
$sid
.
'/'
,
$sql
)
)
{
$index
=
$this
->
getMasterIndex
();
}
else
{
$index
=
$this
->
getSlaveIndex
();
}
}
$this
->
lastUsedIndex
=
$index
;
$conn
=&
$this
->
openConnection
(
$index
);
return
$conn
;
}
}
Event Timeline
Log In to Comment