567 lines
14 KiB
Lua
Executable file
567 lines
14 KiB
Lua
Executable file
--Copyright (c) 2006-2008 Neil Richardson (nrich@iinet.net.au)
|
|
--
|
|
--Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
--of this software and associated documentation files (the "Software"), to deal
|
|
--in the Software without restriction, including without limitation the rights
|
|
--to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
--copies of the Software, and to permit persons to whom the Software is
|
|
--furnished to do so, subject to the following conditions:
|
|
--
|
|
--The above copyright notice and this permission notice shall be included in all
|
|
--copies or substantial portions of the Software.
|
|
--
|
|
--THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
--IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
--FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
--AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
--LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
--OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
--IN THE SOFTWARE.
|
|
|
|
module('Memcached', package.seeall)
|
|
|
|
require('socket')
|
|
require('CRC32')
|
|
|
|
local SERVER_RETRIES = 10
|
|
|
|
local STATS_KEYS = {
|
|
malloc = true,
|
|
sizes = true,
|
|
slabs = true,
|
|
items = true,
|
|
}
|
|
|
|
local FLAGS = {
|
|
'STORABLE',
|
|
'COMPRESSED',
|
|
'SERIALISED',
|
|
}
|
|
|
|
local function warn(str)
|
|
io.stderr:write(string.format('Warning: %s\n', tostring(str)))
|
|
end
|
|
|
|
local function _select_server(cache, key)
|
|
local server_count = #cache.servers
|
|
|
|
local hashfunc = cache.hash or CRC32.Hash
|
|
|
|
if server_count == 1 then
|
|
return cache.servers[1].socket
|
|
else
|
|
local serverhash = hashfunc(key)
|
|
|
|
for i = 0, SERVER_RETRIES do
|
|
local index = (serverhash % server_count) + 1
|
|
local server = cache.servers[index].socket
|
|
|
|
if not server then
|
|
serverhash = hashfunc(serverhash .. i)
|
|
else
|
|
return server
|
|
end
|
|
end
|
|
end
|
|
|
|
error('No servers found')
|
|
return nil
|
|
end
|
|
|
|
local function _retrieve(cache, key, str)
|
|
local server = _select_server(cache, key)
|
|
|
|
server:send(str .. '\r\n')
|
|
|
|
local function toboolean(value)
|
|
if type(value) == 'string' then
|
|
if value == 'true' then
|
|
return true
|
|
elseif value == 'false' then
|
|
return false
|
|
end
|
|
end
|
|
|
|
return nil
|
|
end
|
|
|
|
local function extract_flags(str)
|
|
local num = tonumber(str)
|
|
local flags = {}
|
|
|
|
for i = table.maxn(FLAGS), 1, -1 do
|
|
local bf = 2 ^ (i - 1)
|
|
|
|
if num >= bf then
|
|
flags[FLAGS[i]] = true
|
|
num = num - bf
|
|
end
|
|
end
|
|
|
|
return flags
|
|
end
|
|
|
|
local returndata = {}
|
|
while true do
|
|
local line, err = server:receive()
|
|
|
|
if line == 'END' then
|
|
break
|
|
elseif string.sub(line, 1, 5) == 'VALUE' then
|
|
local key,flagstr,size,cas = string.match(line, 'VALUE (%S+) (%d+) (%d+)')
|
|
flags = extract_flags(flagstr)
|
|
|
|
local data = server:receive(size)
|
|
|
|
if flags.COMPRESSED and cache.compress_enabled then
|
|
data = cache.decompress(data)
|
|
end
|
|
|
|
if flags.SERIALISED then
|
|
returndata[key] = cache.decode(data)
|
|
else
|
|
local ldata = tonumber(data) or toboolean(data)
|
|
|
|
if ldata == nil then
|
|
if data == 'nil' then
|
|
returndata[key] = nil
|
|
else
|
|
returndata[key] = data
|
|
end
|
|
else
|
|
returndata[key] = ldata
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
return returndata
|
|
end
|
|
|
|
local function _send(cache, key, str)
|
|
local server = _select_server(cache, key)
|
|
|
|
server:send(str .. "\r\n")
|
|
local line, err = server:receive()
|
|
|
|
if not err then return line end
|
|
end
|
|
|
|
local function _store(cache, op, key, value, expiry)
|
|
local str
|
|
local flags = 0
|
|
|
|
if type(value) == 'table' then
|
|
str = cache.encode(value)
|
|
-- TODO lookup rather than hard code
|
|
flags = flags + 4
|
|
else
|
|
str = tostring(value)
|
|
end
|
|
|
|
if cache.compress_enabled and string.len(str) > cache.compress_threshold then
|
|
local cstr = cache.compress(str)
|
|
|
|
if string.len(cstr) < (string.len(str) * 0.8) then
|
|
str = cstr
|
|
|
|
-- TODO lookup rather than hard code
|
|
flags = flags + 2
|
|
end
|
|
end
|
|
|
|
local len = string.len(str)
|
|
|
|
expiry = expiry or 0
|
|
|
|
local cmd = op .. ' ' .. key .. ' ' .. flags .. ' ' .. expiry .. ' ' .. len .. '\r\n' .. str
|
|
|
|
local res = _send(cache, key, cmd)
|
|
|
|
if res ~= 'STORED' then
|
|
return false, res
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
local function set(cache, key, value, expiry)
|
|
return _store(cache, 'set', key, value, expiry)
|
|
end
|
|
|
|
local function add(cache, key, value, expiry)
|
|
return _store(cache, 'add', key, value, expiry)
|
|
end
|
|
|
|
local function replace(cache, key, value, expiry)
|
|
return _store(cache, 'replace', key, value, expiry)
|
|
end
|
|
|
|
local function get(cache, key)
|
|
local dataset = _retrieve(cache, key, 'get ' .. key)
|
|
return dataset[key]
|
|
end
|
|
|
|
local function delete(cache, key)
|
|
local res = _send(cache, key, 'delete ' .. key)
|
|
|
|
if res == 'NOT_FOUND' then
|
|
return false
|
|
end
|
|
|
|
if res ~= 'DELETED' then
|
|
return false, res
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
local function incr(cache, key, val)
|
|
val = val or 1
|
|
|
|
local res = _send(cache, key, 'incr ' .. key .. ' ' .. val)
|
|
|
|
if res == 'ERROR' or res == 'CLIENT_ERROR' then
|
|
return false, res
|
|
end
|
|
|
|
return res
|
|
end
|
|
|
|
local function decr(cache, key, val)
|
|
val = val or 1
|
|
|
|
local res = _send(cache, key, 'decr ' .. key .. ' ' .. val)
|
|
|
|
if res == 'ERROR' or res == 'CLIENT_ERROR' then
|
|
return false, res
|
|
end
|
|
|
|
return res
|
|
end
|
|
|
|
local function stats(cache, key)
|
|
local servers = {}
|
|
|
|
key = key or ''
|
|
|
|
if string.len(key) > 0 and not STATS_KEYS[key] then
|
|
error(string.format("Unknown stats key '%s'", key))
|
|
end
|
|
|
|
for i,server in pairs(cache.servers) do
|
|
server.socket:send('stats ' .. key .. '\r\n')
|
|
|
|
local stats = {}
|
|
|
|
while true do
|
|
local line, err = server.socket:receive()
|
|
|
|
if line == 'END' or line == 'ERROR' then
|
|
break
|
|
end
|
|
|
|
local k,v = string.match(line, 'STAT (%S+) (%S+)')
|
|
|
|
if k then
|
|
stats[k] = v
|
|
end
|
|
end
|
|
|
|
servers[server.name] = stats
|
|
end
|
|
|
|
return servers
|
|
end
|
|
|
|
local function get_multi(cache, ...)
|
|
local dataset = nil
|
|
|
|
if table.maxn(cache.servers) > 1 then
|
|
dataset = {}
|
|
|
|
for i,k in ipairs(arg) do
|
|
local data = _retrieve(cache, k, 'get ' .. k)
|
|
dataset[k] = data[k]
|
|
end
|
|
else
|
|
local keys = table.concat(arg, ' ')
|
|
dataset = _retrieve(cache, keys, 'get ' .. keys)
|
|
end
|
|
|
|
return dataset
|
|
end
|
|
|
|
local function flush_all(cache)
|
|
local success = true
|
|
|
|
for i,server in ipairs(cache.servers) do
|
|
server.socket:send('flush_all\r\n')
|
|
local res = assert(server.socket:receive())
|
|
|
|
if res ~= 'OK' then
|
|
success = false
|
|
end
|
|
end
|
|
|
|
return success
|
|
end
|
|
|
|
local function disconnect_all(cache)
|
|
while true do
|
|
local server = table.remove(cache.servers)
|
|
|
|
if not server then
|
|
break
|
|
end
|
|
|
|
server.socket:close()
|
|
end
|
|
end
|
|
|
|
local function set_hash(cache, hashfunc)
|
|
cache.hash = hashfunc
|
|
end
|
|
|
|
local function set_encode(cache, func)
|
|
cache.encode = func
|
|
end
|
|
|
|
local function set_decode(cache, func)
|
|
cache.decode = func
|
|
end
|
|
|
|
local function set_compress(cache, func)
|
|
cache.compress = func
|
|
end
|
|
|
|
local function set_decompress(cache, func)
|
|
cache.decompress = func
|
|
end
|
|
|
|
function Connect(hostlist, port)
|
|
local servers = {}
|
|
|
|
if type(hostlist) == 'table' then
|
|
for i,host in pairs(hostlist) do
|
|
local h, p
|
|
|
|
if type(host) == 'table' then
|
|
h = host[1]
|
|
p = host[2]
|
|
elseif type(host) == 'string' then
|
|
h = host
|
|
elseif type(host) == 'number' then
|
|
p = host
|
|
h = nil
|
|
end
|
|
|
|
if not h then
|
|
h = '127.0.0.1'
|
|
end
|
|
|
|
if not p then
|
|
p = 11211
|
|
end
|
|
|
|
local server = socket.connect(h, p)
|
|
|
|
if not server then
|
|
warn('Could not connect to ' .. h .. ':' .. p)
|
|
else
|
|
table.insert(servers, {socket = server, name = string.format('%s:%d', h, p)})
|
|
end
|
|
end
|
|
else
|
|
local address = hostlist
|
|
|
|
if type(address) == 'number' then
|
|
port = address
|
|
address = nil
|
|
end
|
|
|
|
if address == nil then
|
|
address = '127.0.0.1'
|
|
end
|
|
|
|
if port == nil then
|
|
port = 11211
|
|
end
|
|
|
|
local server = socket.connect(address, port)
|
|
|
|
if not server then
|
|
warn('Could not connect to ' .. address .. ':' .. port)
|
|
else
|
|
servers = {{socket = server, name = string.format('%s:%d', address, port)}}
|
|
end
|
|
end
|
|
|
|
if table.maxn(servers) < 1 then
|
|
error('No servers available')
|
|
end
|
|
|
|
local cache = {
|
|
servers = servers,
|
|
|
|
set_hash = set_hash,
|
|
set_encode = set_encode,
|
|
set_decode = set_decode,
|
|
set_decompress = set_decompress,
|
|
set_compress = set_compress,
|
|
|
|
compress_enabled = false,
|
|
enable_compression = function(self, on)
|
|
self.compress_enabled = on
|
|
end,
|
|
|
|
hash = nil,
|
|
encode = function()
|
|
error('No encode function set')
|
|
end,
|
|
|
|
decode = function()
|
|
error('No decode function set')
|
|
end,
|
|
|
|
compress = function()
|
|
error('No compress function set')
|
|
end,
|
|
|
|
decompress = function()
|
|
error('No decompress function set')
|
|
end,
|
|
|
|
-- 10K default
|
|
compress_threshold = 10240,
|
|
set_compress_threshold = function(self, threshold)
|
|
if threshold == nil then
|
|
self:enable_compression(false)
|
|
else
|
|
self.compress_threshold = threshold
|
|
end
|
|
end,
|
|
|
|
set = set,
|
|
add = add,
|
|
replace = replace,
|
|
get = get,
|
|
delete = delete,
|
|
incr = incr,
|
|
decr = decr,
|
|
|
|
get_multi = get_multi,
|
|
stats = stats,
|
|
flush_all = flush_all,
|
|
disconnect_all = disconnect_all,
|
|
}
|
|
|
|
return cache
|
|
end
|
|
|
|
function New(hostlist, port)
|
|
return Connect(hostlist, port)
|
|
end
|
|
|
|
--
|
|
-- Memcached.lua
|
|
--
|
|
-- A pure Lua implementation of a simple memcached client. 1 or more memcached server(s) are currently supported. Requires the luasocket library.
|
|
-- See http://www.danga.com/memcached/ for more information about memcached.
|
|
--
|
|
--
|
|
--
|
|
-- Synopsis
|
|
--
|
|
-- require('Memcached')
|
|
--
|
|
-- memcache = Memcached.Connect('some.host.com', 11000)
|
|
-- OR
|
|
-- memcache = Memcached.New('some.host.com', 11000)
|
|
--
|
|
-- memcache:set('some_key', 1234)
|
|
-- memcache:add('new_key', 'add new value')
|
|
-- memcache:replace('existing_key', 'replace old value')
|
|
--
|
|
-- cached_data = memcache:get('some_key')
|
|
--
|
|
-- memcache:delete('old_key')
|
|
--
|
|
--
|
|
--
|
|
-- Methods:
|
|
--
|
|
-- memcache = Memcached.Connect()
|
|
-- Connect to memcached server at localhost on port number 11211.
|
|
--
|
|
-- memcache = Memcached.Connect(host[, port])
|
|
-- Connect to memcached server at 'host' on port number 'port'. If port is not provided, port 11211 is used.
|
|
--
|
|
---memcache = Memcached.Connect(port)
|
|
-- Connect to memcached server at localhost on port number 'port'.
|
|
--
|
|
-- memcache = Memcached.Connect({{'host', port}, 'host', port})
|
|
-- Connect to multiple memcached servers.
|
|
--
|
|
-- memcache:set(key, value[, expiry])
|
|
-- Unconditionally sets a key to a given value in the memcache. The value for 'expiry' is the expiration
|
|
-- time (default is 0, never expire).
|
|
--
|
|
-- memcache:add(key, value[, expiry])
|
|
-- Like set, but only stores in memcache if the key doesn't already exist.
|
|
--
|
|
-- memcache:replace(key, value[, expiry])
|
|
-- Like set, but only stores in memcache if the key already exists. The opposite of add.
|
|
--
|
|
-- value = memcache:get(key)
|
|
-- Retrieves a key from the memcache. Returns the value or nil
|
|
--
|
|
-- values = memcache:get_multi(...)
|
|
-- Retrieves multiple keys from the memcache doing just one query. Returns a table of key/value pairs that were available.
|
|
--
|
|
-- memcache:delete(key)
|
|
-- Deletes a key. Returns true on deletion, false if the key was not found.
|
|
--
|
|
-- value = memcache:incr(key[, value])
|
|
-- Sends a command to the server to atomically increment the value for key by value, or by 1 if value is nil.
|
|
-- Returns nil if key doesn't exist on server, otherwise it returns the new value after incrementing. Value should be zero or greater.
|
|
--
|
|
-- value = memcache:decr(key[, value])
|
|
-- Like incr, but decrements. Unlike incr, underflow is checked and new values are capped at 0. If server value is 1, a decrement of 2 returns 0, not -1.
|
|
--
|
|
-- servers = memcache:stats([key])
|
|
-- Returns a table of statistical data regarding the memcache server(s). Allowed keys are:
|
|
-- '', 'malloc', 'sizes', 'slabs', 'items'
|
|
--
|
|
-- success = memcache:flush_all()
|
|
-- Runs the memcached "flush_all" command on all configured hosts, emptying all their caches.
|
|
--
|
|
-- memcache:disconnect_all()
|
|
-- Closes all cached sockets to all memcached servers.
|
|
--
|
|
-- memcache:set_hash(hashfunc)
|
|
-- Sets a custom hash function for key values. The default is a CRC32 hashing function.
|
|
-- 'hashfunc' should be defined receiving a single string parameter and returing a single integer value.
|
|
--
|
|
-- memcache:set_encode(func)
|
|
-- Sets a custom encode function for serialising table values. 'func' should be defined receiving a single
|
|
-- table value and returning a single string value.
|
|
--
|
|
-- memcache:set_decode(func)
|
|
-- Sets a custom decode function for deserialising table values. 'func' should be defined receiving a
|
|
-- single single and returning a single table value
|
|
--
|
|
-- memcache:enable_compression(onflag)
|
|
-- Turns data compression support on or off.
|
|
--
|
|
-- memcache:set_compress_threshold(size)
|
|
-- Set the compression threshold. If the value to be stored is larger than `size' bytes (and compression
|
|
-- is enabled), compress before storing.
|
|
--
|
|
-- memcache:set_compress(func)
|
|
-- Sets a custom data compression function. 'func' should be defined receiving a single string value and
|
|
-- returning a single string value.
|
|
--
|
|
-- memcache:set_decompress(func)
|
|
-- Sets a custom data decompression function. 'func' should be defined receiving a single string value and
|
|
-- returning a single string value.
|