diff --git a/src/tsuru_counter.erl b/src/tsuru_counter.erl new file mode 100644 index 0000000000000000000000000000000000000000..bd4b4dcd4c24dd03adad6030af2d819d580076ed --- /dev/null +++ b/src/tsuru_counter.erl @@ -0,0 +1,25 @@ +-module(tsuru_counter). + +-export([ + new/0, + inc/2, + dec/2, + get/2 +]). + +new() -> + ets:new(counter, [set, {keypos, 1}, {write_concurrency, true}]). + +inc(Counter, Key) -> + ets:update_counter(Counter, Key, {2, 1}, {Key, 0}). + +dec(Counter, Key) -> + ets:update_counter(Counter, Key, {2, -1, 0, 0}, {Key, 0}). + +get(Counter, Key) -> + case ets:lookup(Counter, Key) of + [] -> + 0; + [{Key, Count}] -> + Count + end. diff --git a/src/tsuru_forward_decay.erl b/src/tsuru_forward_decay.erl index a2db0584f259c4fee0c47bc55fc70824b2b861eb..c031772e999ed00a183892d2f372d3d190672a43 100644 --- a/src/tsuru_forward_decay.erl +++ b/src/tsuru_forward_decay.erl @@ -1,12 +1,23 @@ -module(tsuru_forward_decay). --export([new/1, new/4, landmark/1, landmark/2, event/2, event/3, aggregates/2]). +-export([ + new/0, + new/3, + set_block_count/2, + set_landmark/1, + set_landmark/2, + add_event/2, + add_event/3, + events/1, + aggregates/1, + aggregates/2 +]). -record(forward_decay, { id :: term(), time_unit :: erlang:time_unit(), block_size :: pos_integer(), - block_count :: pos_integer(), + block_count :: pos_integer() | infinity, scale_threshold :: float(), landmark :: float(), events :: ets:tab() @@ -16,69 +27,88 @@ -type event_id() :: {float(), integer()}. -export_type([forward_decay/0]). --spec new(term()) -> forward_decay(). -new(Id) -> - new(Id, seconds, 60, 6). +-spec new() -> forward_decay(). +new() -> + new(seconds, 1, infinity). --spec new(term(), erlang:time_unit(), pos_integer(), pos_integer()) -> forward_decay(). -new(Id, TimeUnit, BlockSize, BlockCount) -> - Landmark = erlang:system_time(TimeUnit) / BlockSize, +-spec new(erlang:time_unit(), pos_integer(), pos_integer() | infinity) -> forward_decay(). +new(TimeUnit, BlockSize, BlockCount) -> Events = ets:new(events, [ordered_set, {write_concurrency, true}]), - ScaleThreshold = 1 / math:exp(BlockCount), - #forward_decay{ - id = Id, + ForwardDecay = #forward_decay{ time_unit = TimeUnit, block_size = BlockSize, - block_count = BlockCount, - scale_threshold = ScaleThreshold, - landmark = Landmark, events = Events - }. + }, + set_block_count(BlockCount, set_landmark(ForwardDecay)). + +-spec set_block_count(pos_integer() | infinity, forward_decay()) -> forward_decay(). +set_block_count(infinity, ForwardDecay) -> + ForwardDecay#forward_decay{block_count = infinity, scale_threshold = 0.0}; +set_block_count(BlockCount, ForwardDecay) -> + ScaleThreshold = 1 / math:exp(BlockCount), + ForwardDecay#forward_decay{block_count = BlockCount, scale_threshold = ScaleThreshold}. --spec landmark(forward_decay()) -> forward_decay(). -landmark(ForwardDecay = #forward_decay{time_unit = TimeUnit}) -> - landmark(erlang:system_time(TimeUnit), ForwardDecay). +-spec set_landmark(forward_decay()) -> forward_decay(). +set_landmark(ForwardDecay = #forward_decay{time_unit = TimeUnit}) -> + set_landmark(erlang:system_time(TimeUnit), ForwardDecay). --spec landmark(integer(), forward_decay()) -> forward_decay(). -landmark(Time, ForwardDecay = #forward_decay{block_size = BlockSize}) -> +-spec set_landmark(integer(), forward_decay()) -> forward_decay(). +set_landmark(Time, ForwardDecay = #forward_decay{block_size = BlockSize}) -> Landmark = Time / BlockSize, ForwardDecay#forward_decay{landmark = Landmark}. --spec event(integer(), number(), forward_decay()) -> forward_decay(). -event(Value, ForwardDecay = #forward_decay{time_unit = TimeUnit}) -> - event(erlang:system_time(TimeUnit), Value, ForwardDecay). +-spec add_event(number(), forward_decay()) -> ok | {ok, forward_decay()}. +add_event(Value, ForwardDecay = #forward_decay{time_unit = TimeUnit}) -> + add_event(erlang:system_time(TimeUnit), Value, ForwardDecay). --spec event(integer(), number(), forward_decay()) -> forward_decay(). -event(Time, Value, ForwardDecay = #forward_decay{block_size = BlockSize, landmark = Landmark}) -> - Scale = math:exp((Time / BlockSize) - Landmark), - Id = {Scale, erlang:unique_integer([monotonic])}, %% Prevent collisions in set +-spec add_event(integer(), number(), forward_decay()) -> ok | {ok, forward_decay()}. +add_event(Time, Value, ForwardDecay = #forward_decay{block_size = BlockSize, landmark = Landmark}) -> + UMI = erlang:unique_integer([monotonic]), + Id = {compute_scale(Time, BlockSize, Landmark), UMI}, Events = ForwardDecay#forward_decay.events, ets:insert(Events, {Id, Value}), prune(ForwardDecay, ets:first(Events), Id, 0), scale(ForwardDecay, ets:first(Events)). +-spec events(forward_decay()) -> [{integer(), number()}]. +events(#forward_decay{block_size = BlockSize, landmark = Landmark, events = Events}) -> + [{compute_time(Scale, BlockSize, Landmark), Value} || {{Scale, _UMI}, Value} <- ets:tab2list(Events)]. + +-spec aggregates(forward_decay()) -> {float(), float()}. +aggregates(ForwardDecay = #forward_decay{time_unit = TimeUnit}) -> + aggregates(erlang:system_time(TimeUnit), ForwardDecay). + -spec aggregates(integer(), forward_decay()) -> {float(), float()}. aggregates(Time, #forward_decay{block_size = BlockSize, landmark = Landmark, events = Events}) -> - Scale = math:exp((Time / BlockSize) - Landmark), + Scale = compute_scale(Time, BlockSize, Landmark), ets:foldl( fun({{ScaleEvent, _}, Value}, {Count, Sum}) -> Weight = ScaleEvent / Scale, {Count + Weight, Sum + Weight * Value} end, {0, 0}, Events). - %%%=================================================================== %%% Internal functions -%%%=================================================================== +%%%======================================================ยง============= +-spec compute_time(float(), pos_integer(), float()) -> integer(). +compute_time(Scale, BlockSize, Landmark) -> + round((Landmark + math:log(Scale)) * BlockSize). --spec prune(forward_decay(), event_id() | '$end_of_table', event_id() | '$end_of_table', integer()) -> integer(). -prune(_ForwardDecay, _Current, '$end_of_table', Count) -> - Count; -prune(_ForwardDecay, '$end_of_table', _Last, Count) -> +-spec compute_scale(pos_integer(), pos_integer(), float()) -> float(). +compute_scale(Time, BlockSize, Landmark) -> + case Time / BlockSize of + BlockTime when BlockTime < Landmark -> + throw(time_before_landmark); + BlockTime -> + math:exp(BlockTime - Landmark) + end. + +-spec prune(forward_decay(), event_id(), event_id(), integer()) -> integer(). +prune(#forward_decay{block_count = infinity}, _Current, _Last, Count) -> Count; prune(ForwardDecay, Current = {ScaleCurrent, _}, Last = {ScaleLast, _}, Count) -> case ScaleCurrent / ScaleLast of - R when R < ForwardDecay#forward_decay.scale_threshold -> + R when R =< ForwardDecay#forward_decay.scale_threshold -> Next = ets:next(ForwardDecay#forward_decay.events, Current), ets:delete(ForwardDecay#forward_decay.events, Current), prune(ForwardDecay, Next, Last, Count + 1); @@ -86,8 +116,8 @@ prune(ForwardDecay, Current = {ScaleCurrent, _}, Last = {ScaleLast, _}, Count) - Count end. --spec scale(forward_decay(), event_id()| '$end_of_table') -> ok | {ok, forward_decay()}. -scale(_ForwardDecay, '$end_of_table') -> +-spec scale(forward_decay(), event_id() | float()) -> ok | {ok, forward_decay()}. +scale(#forward_decay{block_count = infinity}, _Id) -> ok; scale(ForwardDecay = #forward_decay{block_count = BlockCount}, {Scale, _}) -> case math:log(Scale) of diff --git a/test/eunit_ide.erl b/test/eunit_ide.erl new file mode 100644 index 0000000000000000000000000000000000000000..9a522f0eab943bf80f947d38abc25360740e3918 --- /dev/null +++ b/test/eunit_ide.erl @@ -0,0 +1,18 @@ +-module(eunit_ide). + +-export([test/1, test/2]). + +-define(INFINITE_TIMOUT, 31536000). %% 365*24*60*60, a year in seconds + +test(Tests) -> + test(Tests, [verbose]). + +test(Tests, Options) -> + case lists:keytake(timeout, 1, Options) of + {value, {timeout, infinite}, NewOptions} -> + eunit:test({timeout, ?INFINITE_TIMOUT, Tests}, NewOptions); + {value, {timeout, Timeout}, NewOptions} when is_integer(Timeout) -> + eunit:test({timeout, Timeout, Tests}, NewOptions); + false -> + eunit:test(Tests, Options) + end. diff --git a/test/tsuru_application_test.erl b/test/tsuru_application_test.erl index deb8b63aa80a81da9de9e00dc19870962924db32..1fc030b5e126aae99ccd5dbddc9d6fe46c221358 100644 --- a/test/tsuru_application_test.erl +++ b/test/tsuru_application_test.erl @@ -1,5 +1,4 @@ -module(tsuru_application_test). --author("erikh"). -include_lib("eunit/include/eunit.hrl"). diff --git a/test/tsuru_counter_test.erl b/test/tsuru_counter_test.erl new file mode 100644 index 0000000000000000000000000000000000000000..718225f9ad2ec41079732ee129bac426f9ec929c --- /dev/null +++ b/test/tsuru_counter_test.erl @@ -0,0 +1,21 @@ +-module(tsuru_counter_test). + +-include_lib("eunit/include/eunit.hrl"). + +basic_test() -> + Counter = tsuru_counter:new(), + ?assertEqual(1, tsuru_counter:inc(Counter, test)), + ?assertEqual(2, tsuru_counter:inc(Counter, test)), + ?assertEqual(3, tsuru_counter:inc(Counter, test)), + ?assertEqual(2, tsuru_counter:dec(Counter, test)), + ?assertEqual(1, tsuru_counter:dec(Counter, test)), + ?assertEqual(0, tsuru_counter:dec(Counter, test_a)), + ?assertEqual(2, tsuru_counter:inc(Counter, test)), + ?assertEqual(3, tsuru_counter:inc(Counter, test)), + ?assertEqual(3, tsuru_counter:get(Counter, test)), + ?assertEqual(2, tsuru_counter:dec(Counter, test)), + ?assertEqual(1, tsuru_counter:dec(Counter, test)), + ?assertEqual(0, tsuru_counter:dec(Counter, test)), + ?assertEqual(0, tsuru_counter:dec(Counter, test)), + ?assertEqual(1, tsuru_counter:inc(Counter, test_a)), + ?assertEqual(0, tsuru_counter:get(Counter, test_b)). diff --git a/test/tsuru_forward_decay_test.erl b/test/tsuru_forward_decay_test.erl new file mode 100644 index 0000000000000000000000000000000000000000..775360f467719684ddebbb6c556bc5aee1320dcd --- /dev/null +++ b/test/tsuru_forward_decay_test.erl @@ -0,0 +1,64 @@ +-module(tsuru_forward_decay_test). + +-include_lib("eunit/include/eunit.hrl"). + +-define(EPSILON, 0.0000000001). + +basic_test() -> + TestEvents = [{N, 100} || N <- lists:seq(1, 3600)], + InitialFD = tsuru_forward_decay:set_landmark(0, tsuru_forward_decay:new(seconds, 60, 6)), + PopulatedFD = lists:foldl( + fun({Time, Value}, ForwardDecay) -> + case tsuru_forward_decay:add_event(Time, Value, ForwardDecay) of + {ok, NewForwardDecay} -> + NewForwardDecay; + ok -> + ForwardDecay + end + end, InitialFD, TestEvents), + ActiveEvents = tsuru_forward_decay:events(PopulatedFD), + ?assertEqual(360, length(ActiveEvents)), + {Count, Sum} = tsuru_forward_decay:aggregates(3600, PopulatedFD), + Average = Sum / Count, + ?assert(abs(100 - Average) < ?EPSILON). + +infinity_order_test() -> + TestEvents = [{N, 10} || N <- lists:seq(1, 500)], + InitialFD = tsuru_forward_decay:set_landmark(1, tsuru_forward_decay:new()), + PopulatedFD = lists:foldr( + fun({Time, Value}, ForwardDecay) -> + case tsuru_forward_decay:add_event(Time, Value, ForwardDecay) of + {ok, NewForwardDecay} -> + NewForwardDecay; + ok -> + ForwardDecay + end + end, InitialFD, TestEvents), + ok, + ActiveEvents = tsuru_forward_decay:events(PopulatedFD), + ?assertEqual(500, length(ActiveEvents)), + {Count, Sum} = tsuru_forward_decay:aggregates(5, PopulatedFD), + Average = Sum / Count, + ?assert(abs(10 - Average) < ?EPSILON). + +exception_test() -> + InitialFD = tsuru_forward_decay:set_landmark(10, tsuru_forward_decay:new()), + try tsuru_forward_decay:add_event(9, 10, InitialFD) + catch + throw:Exception -> + ?assertEqual(time_before_landmark, Exception) + end. + +clock_test() -> + FD = tsuru_forward_decay:new(), + timer:sleep(500), + tsuru_forward_decay:add_event(10, FD), + timer:sleep(500), + tsuru_forward_decay:add_event(10, FD), + timer:sleep(500), + tsuru_forward_decay:add_event(10, FD), + ActiveEvents = tsuru_forward_decay:events(FD), + ?assertEqual(3, length(ActiveEvents)), + {Count, Sum} = tsuru_forward_decay:aggregates(FD), + Average = Sum / Count, + ?assert(abs(10 - Average) < ?EPSILON). diff --git a/test/tsuru_mdns_test.erl b/test/tsuru_mdns_test.erl index d0cb3c96625890efd3a40c067dfafe5291ac3bd1..714e473ae15c64b7b23a43f2adf33a1c79de5fdf 100644 --- a/test/tsuru_mdns_test.erl +++ b/test/tsuru_mdns_test.erl @@ -1,5 +1,4 @@ -module(tsuru_mdns_test). --author("erikh"). -include_lib("eunit/include/eunit.hrl"). @@ -13,4 +12,4 @@ publish_test() -> ?assert(proplists:is_defined("127.0.0.1", TestBrokers)). timeout_test() -> - {ok, TestBrokers} = tsuru_mdns:discover_brokers("_test._tcp.local"). + {ok, _TestBrokers} = tsuru_mdns:discover_brokers("_test._tcp.local"). diff --git a/test/tsuru_string_test.erl b/test/tsuru_string_test.erl index 1209a66c75518802c1853623bc2128589a03d8e2..fc67e4a38869051d5cf1a081ee5797cff3e07f2e 100644 --- a/test/tsuru_string_test.erl +++ b/test/tsuru_string_test.erl @@ -1,5 +1,4 @@ -module(tsuru_string_test). --author("erikh"). -include_lib("eunit/include/eunit.hrl").