nginx-sid/debian/modules/nchan/dev/test.rb
2016-08-31 16:20:00 +03:00

875 lines
23 KiB
Ruby
Executable file

#!/usr/bin/ruby
require 'minitest'
require 'minitest/reporters'
require "minitest/autorun"
Minitest::Reporters.use! [Minitest::Reporters::SpecReporter.new(:color => true)]
require 'securerandom'
require_relative 'pubsub.rb'
require_relative 'authserver.rb'
SERVER=ENV["PUSHMODULE_SERVER"] || "127.0.0.1"
PORT=ENV["PUSHMODULE_PORT"] || "8082"
DEFAULT_CLIENT=:longpoll
OMIT_LONGMSG=ENV["OMIT_LONGMSG"]
#Typhoeus::Config.verbose = true
def short_id
SecureRandom.hex.to_i(16).to_s(36)[0..5]
end
def url(part="")
part=part[1..-1] if part[0]=="/"
"http://#{SERVER}:#{PORT}/#{part}"
end
puts "Server at #{url}"
def pubsub(concurrent_clients=1, opt={})
test_name = caller_locations(1,1)[0].label
urlpart=opt[:urlpart] || 'broadcast'
timeout = opt[:timeout]
sub_url=opt[:sub] || "sub/broadcast/"
pub_url=opt[:pub] || "pub/"
chan_id = opt[:channel] || SecureRandom.hex
sub = Subscriber.new url("#{sub_url}#{chan_id}?test=#{test_name}"), concurrent_clients, timeout: timeout, use_message_id: opt[:use_message_id], quit_message: 'FIN', gzip: opt[:gzip], retry_delay: opt[:retry_delay], client: opt[:client] || DEFAULT_CLIENT, extra_headers: opt[:extra_headers], verbose: opt[:verbose]
pub = Publisher.new url("#{pub_url}#{chan_id}?test=#{test_name}"), timeout: timeout
return pub, sub
end
def verify(pub, sub, check_errors=true)
assert sub.errors.empty?, "There were subscriber errors: \r\n#{sub.errors.join "\r\n"}" if check_errors
ret, err = sub.messages.matches?(pub.messages)
assert ret, err || "Messages don't match"
i=0
sub.messages.each do |msg|
assert_equal sub.concurrency, msg.times_seen, "Concurrent subscribers didn't all receive message #{i}."
i+=1
end
end
class PubSubTest < Minitest::Test
def setup
Celluloid.boot
end
def test_interval_poll
pub, sub=pubsub 1, sub: "/sub/intervalpoll/", client: :intervalpoll, quit_message: 'FIN', retry_delay: 0.5
ws_sub=Subscriber.new(sub.url, 1, client: :websocket, quit_message: 'FIN')
got_304s=0
sub.on_failure do |msg|
got_304s += 1
assert_match /code 304/, msg #handshake will be treated as intervalpoll client?...
end
ws_sub.run
sub.run
sleep 0.4
assert ws_sub.match_errors(/code 403/), "expected 403 for all non-intervalpoll subscribers, got #{sub.errors.pretty_inspect}"
ws_sub.terminate
pub.post ["hello this", "is a thing"]
sleep 0.3
pub.post ["oh now what", "is this even a thing?"]
sleep 0.1
sleep 1.5
pub.post "yoo"
pub.post "FIN"
sub.wait
verify pub, sub
sub.terminate
assert got_304s > 0, "Expected at least one 304 response"
end
def test_channel_info
require 'json'
require 'nokogiri'
require 'yaml'
subs=20
chan=SecureRandom.hex
pub, sub = pubsub(subs, channel: chan, client: :eventsource)
pub.nofail=true
pub.get
assert_equal 404, pub.response_code
pub.post ["hello", "what is this i don't even"]
assert_equal 202, pub.response_code
pub.get
assert_equal 200, pub.response_code
assert_match /last requested: -?\d+ sec/, pub.response_body
pub.get "text/json"
info_json=JSON.parse pub.response_body
assert_equal 2, info_json["messages"]
#assert_equal 0, info_json["requested"]
assert_equal 0, info_json["subscribers"]
sub.run
sub.wait :ready
sleep 0.15
pub.get "text/json"
info_json=JSON.parse pub.response_body
assert_equal 2, info_json["messages"]
#assert_equal 0, info_json["requested"]
assert_equal subs, info_json["subscribers"], "text/json subscriber count"
pub.get "text/xml"
ix = Nokogiri::XML pub.response_body
assert_equal 2, ix.at_xpath('//messages').content.to_i
#assert_equal 0, ix.at_xpath('//requested').content.to_i
assert_equal subs, ix.at_xpath('//subscribers').content.to_i
pub.get "text/yaml"
yaml_resp1=pub.response_body
pub.get "application/yaml"
yaml_resp2=pub.response_body
pub.get "application/x-yaml"
yaml_resp3=pub.response_body
yam=YAML.load pub.response_body
assert_equal 2, yam["messages"]
#assert_equal 0, yam["requested"]
assert_equal subs, yam["subscribers"]
assert_equal yaml_resp1, yaml_resp2
assert_equal yaml_resp2, yaml_resp3
pub.accept="text/json"
pub.post "FIN"
#stats right before FIN was issued
info_json=JSON.parse pub.response_body
assert_equal 3, info_json["messages"]
#assert_equal 0, info_json["requested"]
assert_equal subs, info_json["subscribers"]
sub.wait
pub.get "text/json"
info_json=JSON.parse pub.response_body
assert_equal 3, info_json["messages"], "number of messages received by channel is wrong"
#assert_equal 0, info_json["requested"]
assert_equal 0, info_json["subscribers"], "channel should say there are no subscribers"
sub.terminate
end
def multi_sub_url(pubs, prefix="/sub/multi/", delim="/")
ids = pubs.map{|v| v.id}.shuffle
"#{prefix}#{ids.join delim}"
end
class MultiCheck
attr_accessor :id, :pub
def initialize(id)
self.id = id
self.pub = Publisher.new url("/pub/#{self.id}")
end
end
def test_channel_multiplexing(n=2, delimited=false)
pubs = []
n.times do |i|
pubs << MultiCheck.new(short_id)
end
sub_url=delimited ? multi_sub_url(pubs, '/sub/split/', '_') : multi_sub_url(pubs)
n = 15
scrambles = 5
subs = []
scrambles.times do |i|
sub = Subscriber.new(url(sub_url), n, quit_message: 'FIN', retry_delay: 1, timeout: 20)
sub.on_failure { false }
subs << sub
end
subs.each &:run
pubs.each {|p| p.pub.post "FIRST from #{p.id}" }
10.times do |i|
pubs.each {|p| p.pub.post "hello #{i} from #{p.id}" }
end
sleep 1
5.times do |i|
pubs.first.pub.post "yes #{i} from #{pubs.first.id}"
end
pubs.each do |p|
10.times do |i|
p.pub.post "hello #{i} from #{p.id}"
end
end
latesubs = Subscriber.new(url(sub_url), n, quit_message: 'FIN')
latesubs.on_failure { false }
subs << latesubs
latesubs.run
sleep 1
10.times do |i|
pubs.each {|p| p.pub.post "hello again #{i} from #{p.id}" }
end
pubs.first.pub.post "FIN"
subs.each &:wait
sleep 1
subs.each_with_index do |sub, sub_i|
assert_equal 0, sub.errors.count, "Subscriber encountered #{sub.errors.count} errors: #{sub.errors.join ", "}"
msgs=[]
pubs.each { |p| msgs << p.pub.messages.messages }
sub.messages.each do |msg|
matched = false
for mm in msgs do
if mm.first == msg.message
matched = true
mm.shift
break
end
end
assert_equal matched, true, "message not matched"
end
sub.terminate
end
end
def test_channel_multiplexing_5
test_channel_multiplexing 5
end
def test_channel_delimitered_multiplexing_15
test_channel_multiplexing 15, true
end
def test_message_delivery
pub, sub = pubsub
sub.run
sleep 0.2
assert_equal 0, sub.messages.messages.count
pub.post "hi there"
assert_equal 201, pub.response_code, "publisher response code"
sleep 0.2
assert_equal 1, sub.messages.messages.count, "received message count"
pub.post "FIN"
assert_equal 201, pub.response_code, "publisher response code"
sleep 0.2
assert_equal 2, sub.messages.messages.count, "recelived messages count"
assert sub.messages.matches? pub.messages
sub.terminate
end
def test_publish_then_subscribe
pub, sub = pubsub
pub.post "hi there"
sub.run
pub.post "FIN"
sub.wait
assert_equal 2, sub.messages.messages.count
assert sub.messages.matches? pub.messages
sub.terminate
end
def test_authorized_channels
#must be published to before subscribing
n=5
pub, sub = pubsub n, timeout: 6, sub: "sub/authorized/"
sub.on_failure { false }
sub.run
sleep 1
sub.wait
assert_equal n, sub.finished
assert sub.match_errors(/code 403/), "expected 403 for all subscribers, got #{sub.errors.pretty_inspect}"
sub.reset
pub.post %w( fweep )
assert_match /20[12]/, pub.response_code.to_s
sleep 0.1
sub.run
sleep 0.1
pub.post ["fwoop", "FIN"] { assert_match /20[12]/, pub.response_code.to_s }
sub.wait
verify pub, sub
sub.terminate
end
def test_deletion
#delete active channel
par=5
pub, sub = pubsub par, timeout: 10
sub.on_failure { false }
sub.run
sleep 0.2
pub.delete
sleep 0.1
assert_equal 200, pub.response_code
assert_equal par, pub.response_body.match(/subscribers:\s+(\d)/)[1].to_i, "subscriber count after deletion"
sub.wait
assert sub.match_errors(/code 410/), "Expected subscriber code 410: Gone, instead was \"#{sub.errors.first}\""
#delete channel with no subscribers
pub, sub = pubsub 5, timeout: 1
pub.post "hello"
assert_equal 202, pub.response_code
pub.delete
assert_equal 200, pub.response_code
#delete nonexistent channel
pub, sub = pubsub
pub.nofail=true
pub.delete
assert_equal 404, pub.response_code
end
def test_no_message_buffer
chan_id=SecureRandom.hex
pub = Publisher.new url("/pub/nobuffer/#{chan_id}")
sub=[]
40.times do
sub.push Subscriber.new(url("/sub/broadcast/#{chan_id}"), 1, use_message_id: false, quit_message: 'FIN')
end
pub.post ["this message should not be delivered", "nor this one"]
sub.each {|s| s.run}
sleep 1
pub.post "received1"
sleep 1
pub.post "received2"
sleep 1
pub.post "FIN"
sub.each {|s| s.wait}
sub.each do |s|
assert s.errors.empty?, "There were subscriber errors: \r\n#{s.errors.join "\r\n"}"
ret, err = s.messages.matches? ["received1", "received2", "FIN"]
assert ret, err || "Messages don't match"
end
end
def test_channel_isolation
rands= %w( foo bar baz bax qqqqqqqqqqqqqqqqqqq eleven andsoon andsoforth feh )
pub=[]
sub=[]
10.times do |i|
pub[i], sub[i]=pubsub 15
sub[i].run
end
pub.each do |p|
rand(1..10).times do
p.post rands.sample
end
end
sleep 1
pub.each do |p|
p.post 'FIN'
end
sub.each do |s|
s.wait
end
pub.each_with_index do |p, i|
verify p, sub[i]
end
sub.each {|s| s.terminate }
end
def test_broadcast_3
test_broadcast 3
end
def test_broadcast_20
test_broadcast 20
end
def test_longpoll_multipart
pub, sub = pubsub 1, sub: 'sub/multipart/', use_message_id: false
pub.post "first", "text/x-foobar"
pub.post ["1", "2", "3", "4"]
sub.run
sleep 0.5
pub.post "FIN"
sub.wait
verify pub, sub
sub.terminate
end
def test_longpoll_multipart_extended(range=30..35)
range.each do |i|
pub, sub = pubsub 1, sub: 'sub/multipart/', use_message_id: false, timeout: 3
i.times do |n|
pub.post "#{n+1}"
end
pub.post "FIN"
sub.run
sub.wait
verify pub, sub
sleep 0.1
end
end
def test_multiplexed_longpoll_multipart
chans= [short_id, short_id, short_id]
pub, sub = pubsub 1, sub: "sub/multipart_multiplex/#{chans.join "/"}", pub: "pub/#{chans[1]}", channel: "", use_message_id: false
pub.post "first", "text/x-foobar"
pub.post ["1", "2", "3", "4"]
sub.run
sleep 0.5
pub.post "FIN"
sub.wait
verify pub, sub
sub.terminate
end
def test_broadcast(clients=400)
pub, sub = pubsub clients
pub.post "!!"
sub.run #celluloid async FTW
#sleep 2
pub.post ["!!!!", "what is this", "it's nothing", "nothing at all really"]
pub.post "FIN"
sub.wait
sleep 0.5
verify pub, sub
sub.terminate
end
#def test_broadcast_10000
# test_broadcast 10000
#end
def dont_test_subscriber_concurrency
chan=SecureRandom.hex
pub_first = Publisher.new url("pub/first#{chan}")
pub_last = Publisher.new url("pub/last#{chan}")
sub_first, sub_last = [], []
{ url("sub/first/first#{chan}") => sub_first, url("sub/last/last#{chan}") => sub_last }.each do |url, arr|
3.times do
sub=Subscriber.new(url, 1, quit_message: 'FIN', timeout: 20)
sub.on_failure do |resp, req|
false
end
arr << sub
end
end
sub_first.each {|s| s.run; sleep 0.1 }
assert sub_first[0].no_errors?
sub_first[1..2].each do |s|
assert s.errors?
assert s.match_errors(/code 409/)
end
sub_last.each {|s| s.run; sleep 0.1 }
assert sub_last[2].no_errors?
sub_last[0..1].each do |s|
assert s.errors?
assert s.match_errors(/code 40[49]/)
end
pub_first.post %w( foo bar FIN )
pub_last.post %w( foobar baz somethingelse FIN )
sub_first[0].wait
sub_last[2].wait
verify pub_first, sub_first[0]
verify pub_last, sub_last[2]
sub_first[1..2].each{ |s| assert s.messages.count == 0 }
sub_last[0..1].each{ |s| assert s.messages.count == 0 }
[sub_first, sub_last].each {|sub| sub.each{|s| s.terminate}}
end
def test_queueing
pub, sub = pubsub 1
pub.post %w( what is this_thing andnow 555555555555555555555 eleven FIN ), 'text/plain'
sleep 0.3
sub.run
sub.wait
verify pub, sub
sub.terminate
end
def test_long_message(kb=0.5)
pub, sub = pubsub 10, timeout: 10
sub.run
sleep 0.2
pub.post ["#{"q"*((kb * 1024)-3)}end", "FIN"]
sub.wait
verify pub, sub
sub.terminate
end
unless OMIT_LONGMSG
#[5, 9, 9.5, 9.9, 10, 11, 15, 16, 17, 18, 19, 20, 30, 50, 100, 200, 300, 600, 900, 3000].each do |n|
[5, 10, 20, 200, 900].each do |n|
define_method "test_long_message_#{n}Kb" do
test_long_message n
end
end
def test_message_length_range
pub, sub = pubsub 2, timeout: 15
sub.run
n=5
while n <= 10000 do
pub.post "T" * n
n=(n*1.01) + 1
sleep 0.001
end
pub.post "FIN"
sub.wait
verify pub, sub
sub.terminate
end
def generic_test_long_buffed_messages(client=:longpoll)
kb=2000
#kb=2
pub, sub = pubsub 1, sub: "/sub/broadcast/", timeout: 10, client: client
#pub, sub = pubsub 1, sub: "/sub/websocket_only/", client: :websocket
#sub.on_message do |msg|
# puts ">>>>>>>message: #{msg.message[0...10]}...|#{msg.message.length}|"
#end
sub.run
sleep 1
m1="#{"q"*((kb * 1024)-3)}end"
m2="#{"r"*((kb * 1024)-3)}end"
i=0
15.times do
i+=1
pub.post "#{i}#{m1}"
i+=1
pub.post "#{i}#{m2}"
end
pub.post "FIN"
sub.wait
verify pub, sub
pub.delete
sub.terminate
end
[:longpoll, :multipart, :eventsource, :websocket, :chunked].each do |client|
define_method "test_long_buffed_messages_#{client}" do
generic_test_long_buffed_messages client
end
end
end
def test_message_timeout
pub, sub = pubsub 1, pub: "/pub/2_sec_message_timeout/", timeout: 10
pub.post %w( foo bar etcetera ) #these shouldn't get delivered
pub.messages.clear
sleep 3
#binding.pry
sub.run
sleep 1
pub.post %w( what is this even FIN )
sub.wait
verify pub, sub
sub.terminate
end
def test_subscriber_timeout
chan=SecureRandom.hex
sub=Subscriber.new(url("sub/timeout/#{chan}"), 5, timeout: 10)
sub.on_failure { false }
pub=Publisher.new url("pub/#{chan}")
sub.run
sleep 0.1
pub.post "hello"
sub.wait
verify pub, sub, false
assert sub.match_errors(/code 408/)
sub.terminate
end
def assert_header_includes(response, header, str)
assert response.headers[header].include?(str), "Response header '#{header}: #{response.headers[header]}' must contain \"#{str}\", but does not."
end
def test_access_control_options
chan=SecureRandom.hex
request = Typhoeus::Request.new url("sub/broadcast/#{chan}"), method: :OPTIONS, headers: { 'Origin':'example.com' }
resp = request.run
assert_equal "*", resp.headers["Access-Control-Allow-Origin"]
%w( GET ).each do |v|
assert_header_includes resp, "Access-Control-Allow-Methods", v
assert_header_includes resp, "Allow", v
end
%w( If-None-Match If-Modified-Since Content-Type Cache-Control X-EventSource-Event ).each {|v| assert_header_includes resp, "Access-Control-Allow-Headers", v}
request = Typhoeus::Request.new url("sub/broadcast/#{chan}"), method: :OPTIONS
resp = request.run
%w( GET ).each do |v|
assert_header_includes resp, "Allow", v
end
request = Typhoeus::Request.new url("pub/#{chan}"), method: :OPTIONS, headers: { 'Origin': "example.com" }
resp = request.run
assert_equal "*", resp.headers["Access-Control-Allow-Origin"]
%w( GET POST DELETE ).each do |v|
assert_header_includes resp, "Access-Control-Allow-Methods", v
assert_header_includes resp, "Allow", v
end
%w( Content-Type ).each {|v| assert_header_includes resp, "Access-Control-Allow-Headers", v}
request = Typhoeus::Request.new url("pub/#{chan}"), method: :OPTIONS
resp = request.run
%w( GET POST DELETE ).each do |v|
assert_header_includes resp, "Allow", v
end
end
def generic_test_access_control(opt)
pub, sub = pubsub 1, extra_headers: { Origin: opt[:origin] }, pub: opt[:pub_url], sub: opt[:sub_url]
sub.on_message do |msg, bundle|
opt[:verify_sub_response].call(bundle) if opt[:verify_sub_response]
end
pub.post "FIN"
sub.run
sub.wait
yield pub, sub if block_given?
sub.terminate
end
def test_invalid_etag
chan_id=short_id
pub = Publisher.new url("/pub/#{chan_id}"), accept: 'text/json'
pub.post "1. one!!"
sleep 1
pub.post "2. tooo"
pub.post "3. throo"
sleep 1
pub.post "4. fooo"
n = 0
sub = Subscriber.new(url("/sub/multipart_multiplex/#{short_id}/#{short_id}/#{chan_id}"), 1, quit_message: 'FIN', retry_delay: 1, timeout: 20)
sub.on_message do |msg, bundle|
n=n+1
if n == 2
bundle.etag="null"
end
end
sub.on_failure do |err|
assert_match /code 400/, err
false
end
sub.run
sleep 1
pub.post "5. faaa"
pub.post "FIN"
sub.wait
end
#def test_expired_messages_with_subscribers
# chan = short_id
# pub, sub = pubsub 1, pub: "/pub/2_sec_message_timeout/", sub: "/sub/intervalpoll/", client: :intervalpoll, timeout: 9000, channel: short_id
# sub.on_failure do |err|
# puts "retry?!!"
# true
# end
# sub.run
# pub.post ["foo", "bar"]
#
# sleep 5
#
# pub.post ["yeah", "what", "the"]
# sub.wait
#end
def dont_test_auth
chan = short_id
subs = [ :longpoll, :eventsource, :websocket, :multipart ]
subs.each do |t|
sub = Subscriber.new(url("sub/auth_fail/#{chan}"), 1, client: t)
sub.on_failure { false }
sub.run
sub.wait
assert(sub.errors?)
assert /code 500/, sub.errors.first
sub.terminate
end
auth_pid = Process.spawn("bundle exec ./authserver.rb -q")
while true
resp = Typhoeus.get("http://127.0.0.1:8053/", followlocation: true)
break unless resp.return_code == :couldnt_connect
sleep 0.20
end
subs.each do |t|
sub = Subscriber.new(url("sub/auth_fail/#{chan}"), 1, client: t)
sub.on_failure { false }
sub.run
sub.wait
assert(sub.errors?)
assert /code 403/, sub.errors.first
sub.terminate
end
pub = Publisher.new url("pub/#{chan}")
pub.post [ "wut", "waht", "FIN" ]
subs.each do |t|
sub = Subscriber.new(url("sub/auth/#{chan}"), 1, client: t, quit_message: 'FIN')
sub.on_failure { false }
sub.run
sub.wait
verify pub, sub
end
Process.kill 2, auth_pid
end
def test_access_control
ver= proc do |bundle|
assert_equal "*", bundle.headers["Access-Control-Allow-Origin"]
end
generic_test_access_control(origin: "example.com", verify_sub_response: ver) do |pub, sub|
verify pub, sub
end
ver= proc do |bundle|
assert_equal "http://foo.bar", bundle.headers["Access-Control-Allow-Origin"]
%w( Last-Modified Etag ).each {|v| assert_header_includes bundle, "Access-Control-Expose-Headers", v}
end
generic_test_access_control(origin: "http://foo.bar", verify_sub_response: ver, sub_url: "sub/from_foo.bar/") do |pub, sub|
verify pub, sub
end
#test forbidding stuff
pub, sub = pubsub 1, extra_headers: { "Origin": "http://forbidden.com" }, pub: "pub/from_foo.bar/", sub: "sub/from_foo.bar/", timeout: 1
pub.extra_headers={ "Origin": "http://foo.bar" }
pub.post "yeah"
assert_match /20[12]/, pub.response_code.to_s
pub.extra_headers={ "Origin": "http://forbidden.com" }
post_failed = false
begin
pub.post "yeah"
rescue Exception => e
post_failed = true
assert_match /request failed:\s+403/, e.message
end
assert post_failed
sub.on_failure { false }
sub.run
sub.wait
sub.errors.each do |err|
assert_match /code 403/, err
end
sub.terminate
end
def test_gzip
#bug: turning on gzip cleared the response etag
pub, sub = pubsub 1, sub: "/sub/gzip/", gzip: true, retry_delay: 0.3
sub.run
sleep 0.1
pub.post ["2", "123456789A", "alsdjklsdhflsajkfhl", "boq"]
sleep 1
pub.post "foobar"
msg = ""
200.times { msg << SecureRandom.hex }
pub.post msg
pub.post "FIN"
sub.wait
verify pub, sub
end
def test_issue_212 #https://github.com/slact/nchan/issues/212
chan1 = short_id
chan2 = short_id
sub = Subscriber.new url("/sub/multi/#{chan1}/#{chan2}"), 1, quit_message: 'FIN', client: :eventsource, timeout: 3
sub.on_failure { false }
pub = Publisher.new url("/pub/#{chan1}")
pub_nobuf = Publisher.new url("/pub/nobuffer/#{chan2}")
sub.run
sub.wait :ready
pub.post %w(yes what this and also)
sleep 1.1
pub_nobuf.post "WHAT?!"
pub.messages << pub_nobuf.messages.first
pub.post %W(foo bar baz bzzzt FIN)
sub.wait
verify pub, sub
end
def test_changing_buffer_length
chan = short_id
sub = Subscriber.new url("sub/broadcast/#{chan}"), 30, quit_message: 'FIN'
pub_delta = Publisher.new url("/pub/#{chan}")
pub_snapshot = Publisher.new url("/pub_1message/#{chan}")
pub_delta.post %W(foo bar baz bzzzt FIN)
sub.run
sub.wait
verify pub_delta, sub
sub.reset
pub_snapshot.post %W(blah anotherblah)
pub_snapshot.messages.clear
pub_snapshot.post "this is the real thing right here"
pub_delta.messages.clear
pub_delta.messages << pub_snapshot.messages.first
pub_delta.post %W(d1 d2 d3 and_other_deltas FIN)
sub.run
sub.wait
verify pub_delta, sub
end
end