diff --git a/Makefile b/Makefile index ecced7c1a5c72599e28fbbdf5f9193688e9617bd..21e3734953e844324085338bed4e8a62e88b6902 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ compile: @$(REBAR) compile eunit: - @$(REBAR) do eunit --cover --dir=test, cover --verbose + @$(REBAR) as test do eunit --cover --dir=test, cover --verbose dialyzer: @$(REBAR) dialyzer diff --git a/src/tsuru_application.erl b/src/tsuru_application.erl index 7ebcca259318817a21bd556405fc0fc194c69a0a..192359b1a40e8063c739e876fdbd091d0b79a352 100644 --- a/src/tsuru_application.erl +++ b/src/tsuru_application.erl @@ -1,12 +1,15 @@ -module(tsuru_application). --author("erikh"). -%% API -export([set_env/1, get_os_env/1]). +-spec set_env(atom()) -> [{'ok', {atom(), term()}}]. set_env(Application) -> - [{application:set_env(Application, Name, Value, [{timeout, infinity}, {persistent, true}]), {Name, Value}} || {Name, Value} <- get_os_env(Application)]. + lists:map( + fun({Name, Value}) -> + {application:set_env(Application, Name, Value, [{timeout, infinity}, {persistent, true}]), {Name, Value}} + end, get_os_env(Application)). +-spec get_os_env(atom()) -> [{atom(), term()}]. get_os_env(Application) -> S1 = string:to_upper(atom_to_list(Application)), S2 = re:replace(S1, "-", "_", [global, {return, list}]), @@ -17,9 +20,10 @@ get_os_env(Application) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +-spec get_os_env(string(), [string()], erl_eval:binding_struct()) -> erl_eval:bindings(). get_os_env(_Pattern, [], Bindings) -> erl_eval:bindings(Bindings); - get_os_env(Pattern, [Var | Vars], Bindings) -> case re:run(Var, Pattern) of {match, [{0, _}, {S1, L1}, {S2, L2}]} -> diff --git a/src/tsuru_forward_decay.erl b/src/tsuru_forward_decay.erl new file mode 100644 index 0000000000000000000000000000000000000000..a2db0584f259c4fee0c47bc55fc70824b2b861eb --- /dev/null +++ b/src/tsuru_forward_decay.erl @@ -0,0 +1,109 @@ +-module(tsuru_forward_decay). + +-export([new/1, new/4, landmark/1, landmark/2, event/2, event/3, aggregates/2]). + +-record(forward_decay, { + id :: term(), + time_unit :: erlang:time_unit(), + block_size :: pos_integer(), + block_count :: pos_integer(), + scale_threshold :: float(), + landmark :: float(), + events :: ets:tab() +}). + +-type forward_decay() :: #forward_decay{}. +-type event_id() :: {float(), integer()}. +-export_type([forward_decay/0]). + +-spec new(term()) -> forward_decay(). +new(Id) -> + new(Id, seconds, 60, 6). + +-spec new(term(), erlang:time_unit(), pos_integer(), pos_integer()) -> forward_decay(). +new(Id, TimeUnit, BlockSize, BlockCount) -> + Landmark = erlang:system_time(TimeUnit) / BlockSize, + Events = ets:new(events, [ordered_set, {write_concurrency, true}]), + ScaleThreshold = 1 / math:exp(BlockCount), + #forward_decay{ + id = Id, + time_unit = TimeUnit, + block_size = BlockSize, + block_count = BlockCount, + scale_threshold = ScaleThreshold, + landmark = Landmark, + events = Events + }. + +-spec landmark(forward_decay()) -> forward_decay(). +landmark(ForwardDecay = #forward_decay{time_unit = TimeUnit}) -> + landmark(erlang:system_time(TimeUnit), ForwardDecay). + +-spec landmark(integer(), forward_decay()) -> forward_decay(). +landmark(Time, ForwardDecay = #forward_decay{block_size = BlockSize}) -> + Landmark = Time / BlockSize, + ForwardDecay#forward_decay{landmark = Landmark}. + +-spec event(integer(), number(), forward_decay()) -> forward_decay(). +event(Value, ForwardDecay = #forward_decay{time_unit = TimeUnit}) -> + event(erlang:system_time(TimeUnit), Value, ForwardDecay). + +-spec event(integer(), number(), forward_decay()) -> forward_decay(). +event(Time, Value, ForwardDecay = #forward_decay{block_size = BlockSize, landmark = Landmark}) -> + Scale = math:exp((Time / BlockSize) - Landmark), + Id = {Scale, erlang:unique_integer([monotonic])}, %% Prevent collisions in set + Events = ForwardDecay#forward_decay.events, + ets:insert(Events, {Id, Value}), + prune(ForwardDecay, ets:first(Events), Id, 0), + scale(ForwardDecay, ets:first(Events)). + +-spec aggregates(integer(), forward_decay()) -> {float(), float()}. +aggregates(Time, #forward_decay{block_size = BlockSize, landmark = Landmark, events = Events}) -> + Scale = math:exp((Time / BlockSize) - Landmark), + ets:foldl( + fun({{ScaleEvent, _}, Value}, {Count, Sum}) -> + Weight = ScaleEvent / Scale, + {Count + Weight, Sum + Weight * Value} + end, {0, 0}, Events). + + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec prune(forward_decay(), event_id() | '$end_of_table', event_id() | '$end_of_table', integer()) -> integer(). +prune(_ForwardDecay, _Current, '$end_of_table', Count) -> + Count; +prune(_ForwardDecay, '$end_of_table', _Last, Count) -> + Count; +prune(ForwardDecay, Current = {ScaleCurrent, _}, Last = {ScaleLast, _}, Count) -> + case ScaleCurrent / ScaleLast of + R when R < ForwardDecay#forward_decay.scale_threshold -> + Next = ets:next(ForwardDecay#forward_decay.events, Current), + ets:delete(ForwardDecay#forward_decay.events, Current), + prune(ForwardDecay, Next, Last, Count + 1); + _ -> + Count + end. + +-spec scale(forward_decay(), event_id()| '$end_of_table') -> ok | {ok, forward_decay()}. +scale(_ForwardDecay, '$end_of_table') -> + ok; +scale(ForwardDecay = #forward_decay{block_count = BlockCount}, {Scale, _}) -> + case math:log(Scale) of + TimeDelta when TimeDelta > BlockCount -> + scale(ForwardDecay, TimeDelta); + _ -> + ok + end; +scale(ForwardDecay = #forward_decay{landmark = Landmark, events = Events}, TimeDelta) -> + R = math:exp(-1 * TimeDelta), + NewLandmark = Landmark + TimeDelta, + NewEvents = ets:new(events, [ordered_set, {write_concurrency, true}]), + ets:foldl( + fun({{Scale, UMI}, Value}, Count) -> + Id = {R * Scale, UMI}, + ets:insert(NewEvents, {Id, Value}), + Count + 1 + end, 0, Events), + {ok, ForwardDecay#forward_decay{landmark = NewLandmark, events = NewEvents}}. diff --git a/src/tsuru_mdns.erl b/src/tsuru_mdns.erl index d16ac5efe7784918508bc06a4bf2366813a65565..a2baf08d990cbd26882f5aed5c20bfabd06c4e4f 100644 --- a/src/tsuru_mdns.erl +++ b/src/tsuru_mdns.erl @@ -1,27 +1,34 @@ -module(tsuru_mdns). --author("erikh"). -include_lib("kernel/src/inet_dns.hrl"). -define(MDNS_ADDR, {224, 0, 0, 251}). -define(MDNS_PORT, 5353). -%% API -export([start_link/3, publish_broker/3, discover_brokers/1, discover_brokers/2]). -%% Exported types -export_type([brokers/0]). -type brokers() :: [{Address :: string(), Port :: inet:port_number()}]. +-type dns_rec() :: #dns_rec{}. --spec(start_link(Domain :: string(), Address :: string(), Port :: inet:port_number()) -> {ok, Pid :: pid()}). +-spec start_link(string(), string(), inet:port_number()) -> {ok, pid()}. start_link(Domain, Address, Port) -> {ok, publish_broker(Domain, Address, Port)}. --spec(publish_broker(Domain :: string(), Address :: string(), Port :: inet:port_number()) -> - {ok, Pid :: pid()} | - {error, Reason :: inet:posix()}). +-spec publish_broker(string(), string(), inet:port_number()) -> + {ok, pid()} | + {error, inet:posix()}. publish_broker(Domain, Address, Port) -> - case gen_udp:open(?MDNS_PORT, [{active, false}, {reuseaddr, true}, {ip, ?MDNS_ADDR}, {multicast_ttl, 4}, {multicast_loop, false}, {mode, binary}, {add_membership, {?MDNS_ADDR, {0, 0, 0, 0}}}]) of + Opts = [ + {active, false}, + {reuseaddr, true}, + {ip, ?MDNS_ADDR}, + {multicast_ttl, 4}, + {multicast_loop, false}, + {mode, binary}, + {add_membership, {?MDNS_ADDR, {0, 0, 0, 0}}} + ], + case gen_udp:open(?MDNS_PORT, Opts) of {ok, Socket} -> Response = #dns_rec{ header = #dns_header{qr = 1, opcode = ?QUERY}, @@ -36,7 +43,7 @@ publish_broker(Domain, Address, Port) -> {error, Reason} end. --spec(receive_request(Domain :: string(), Socket :: port(), Response :: #dns_rec{}) -> any()). +-spec receive_request(string(), port(), dns_rec()) -> any(). receive_request(Domain, Socket, Response) -> case gen_udp:recv(Socket, 1024) of {ok, {Address, Port, Packet}} -> @@ -46,7 +53,7 @@ receive_request(Domain, Socket, Response) -> receive_request(Domain, Socket, Response) end. --spec(handle_request(Domain :: string(), Address :: inet:ip_address(), Port :: inet:port_number(), Packet :: binary(), Response :: #dns_rec{}) -> ok). +-spec handle_request(string(), inet:ip_address(), inet:port_number(), binary(), dns_rec()) -> ok. handle_request(Domain, Address, Port, Packet, Response) -> case inet_dns:decode(Packet) of {ok, Request} -> @@ -62,15 +69,11 @@ handle_request(Domain, Address, Port, Packet, Response) -> ok end. --spec(discover_brokers(Domain :: string()) -> - {ok, Brokers :: brokers()} | - {error, Reason :: not_owner | inet:posix()}). +-spec discover_brokers(string()) -> {ok, brokers()} | {error, not_owner | inet:posix()}. discover_brokers(Domain) -> discover_brokers(Domain, 3000). --spec(discover_brokers(Domain :: string(), Timeout :: pos_integer()) -> - {ok, Brokers :: brokers()} | - {error, Reason :: not_owner | inet:posix()}). +-spec discover_brokers(string(), pos_integer()) -> {ok, brokers()} | {error, not_owner | inet:posix()}. discover_brokers(Domain, Timeout) -> case gen_udp:open(0, [{broadcast, true}, {active, false}, {mode, binary}]) of {ok, Socket} -> @@ -81,9 +84,9 @@ discover_brokers(Domain, Timeout) -> {error, Reason} end. --spec(receive_response(Domain :: string(), Socket :: port(), Timeout :: pos_integer(), Brokers :: brokers()) -> - {ok, Brokers :: brokers()} | - {error, Reason :: not_owner | inet:posix()}). +-spec receive_response(string(), port(), pos_integer(), brokers()) -> + {ok, brokers()} | + {error, not_owner | inet:posix()}. receive_response(Domain, Socket, Timeout, Brokers) -> case gen_udp:recv(Socket, 1024, Timeout) of {ok, {Address, Port, Packet}} -> @@ -94,26 +97,30 @@ receive_response(Domain, Socket, Timeout, Brokers) -> {error, Reason} end. --spec(handle_response(Domain :: string(), Address :: inet:ip_address(), Port :: inet:port_number(), Packet :: binary(), Brokers :: brokers()) -> Brokers :: brokers()). -handle_response(Domain, Address, _Port, Packet, Brokers) -> +-spec handle_response(string(), inet:ip_address(), inet:port_number(), binary(), brokers()) -> brokers(). +handle_response(Domain, Address, Port, Packet, Brokers) -> case inet_dns:decode(Packet) of - {ok, Request} -> - Header = Request#dns_rec.header, - if - Header#dns_header.qr -> - case Request#dns_rec.anlist of - [#dns_rr{domain = Domain, data = {0, 0, BrokerPort, "0.0.0.0"}}] -> - Broker = {inet_parse:ntoa(Address), BrokerPort}, - [Broker | Brokers]; - [#dns_rr{domain = Domain, data = {0, 0, BrokerPort, BrokerAddress}}] -> - Broker = {BrokerAddress, BrokerPort}, - [Broker | Brokers]; - _ -> - Brokers - end; - true -> + {ok, DnsRec} -> + handle_response_record(Domain, Address, Port, DnsRec, Brokers); + {error, _Reason} -> + Brokers + end. + +-spec handle_response_record(string(), inet:ip_address(), inet:port_number(), dns_rec(), brokers()) -> brokers(). +handle_response_record(Domain, Address, _Port, DnsRec, Brokers) -> + Header = DnsRec#dns_rec.header, + case Header#dns_header.qr of + true -> + case DnsRec#dns_rec.anlist of + [#dns_rr{domain = Domain, data = {0, 0, BrokerPort, "0.0.0.0"}}] -> + Broker = {inet_parse:ntoa(Address), BrokerPort}, + [Broker | Brokers]; + [#dns_rr{domain = Domain, data = {0, 0, BrokerPort, BrokerAddress}}] -> + Broker = {BrokerAddress, BrokerPort}, + [Broker | Brokers]; + _ -> Brokers end; - {error, _Reason} -> + _ -> Brokers end. diff --git a/src/tsuru_string.erl b/src/tsuru_string.erl index 1b2b941c33c9326a70a486fcbe9975a55e5e58ff..40f5ef4c20b7889385343bba534e468a9a06d8f1 100644 --- a/src/tsuru_string.erl +++ b/src/tsuru_string.erl @@ -1,20 +1,17 @@ -module(tsuru_string). --author("erikh"). -%% API -export([prefix_matches/2, strip_prefix/2]). +-spec prefix_matches(string(), string()) -> boolean(). prefix_matches([H | Prefix], [H | String]) -> prefix_matches(Prefix, String); - prefix_matches([], _String) -> true; - prefix_matches(_Prefix, _String) -> false. +-spec strip_prefix(string(), string()) -> string(). strip_prefix([H | Prefix], [H | String]) -> strip_prefix(Prefix, String); - strip_prefix(_Prefix, String) -> String. diff --git a/test/tsuru_mdns_test.erl b/test/tsuru_mdns_test.erl index a1260868d1cb6b1b1b0cd1c063c0b1f95795c7e4..d0cb3c96625890efd3a40c067dfafe5291ac3bd1 100644 --- a/test/tsuru_mdns_test.erl +++ b/test/tsuru_mdns_test.erl @@ -11,3 +11,6 @@ publish_test() -> tsuru_mdns:publish_broker("_other._tcp.local", "0.0.0.0", 8082), {ok, TestBrokers} = tsuru_mdns:discover_brokers("_test._tcp.local"), ?assert(proplists:is_defined("127.0.0.1", TestBrokers)). + +timeout_test() -> + {ok, TestBrokers} = tsuru_mdns:discover_brokers("_test._tcp.local").