diff --git a/.gitignore b/.gitignore index d1be8c7e288e1154a8d1eafd9306c892832f547a..5e58337f364ce02f547ae11e3225c3487c554aab 100644 --- a/.gitignore +++ b/.gitignore @@ -11,11 +11,6 @@ ebin log erl_crash.dump .rebar -_rel -_deps -_plugins -_tdeps logs _build rebar3 -rebar.lock diff --git a/apps/consul_proxy/src/consul_proxy_app.erl b/apps/consul_proxy/src/consul_proxy_app.erl index bdd722d3669962df865197407742870285516c1c..76e9a55f0384e2b9b010deb44b910f65ed283076 100644 --- a/apps/consul_proxy/src/consul_proxy_app.erl +++ b/apps/consul_proxy/src/consul_proxy_app.erl @@ -18,6 +18,7 @@ start(_StartType, StartArgs) -> lager:set_loglevel(lager_console_backend, application:get_env(consul_proxy, loglevel, info)), Middlewares = [ + consul_proxy_middleware_hijack, vegur_validate_headers, consul_proxy_middleware_acme, consul_proxy_middleware_alias, @@ -48,7 +49,7 @@ start(_StartType, StartArgs) -> ]} ]), - ListenerCount = 100, + ListenerCount = application:get_env(consul_proxy, api_listeners, 100), RanchOptions = [ {port, application:get_env(consul_proxy, api_port, 8082)} diff --git a/apps/consul_proxy/src/consul_proxy_middleware_hijack.erl b/apps/consul_proxy/src/consul_proxy_middleware_hijack.erl index aaad41624dbdd547d5ae9bd77de6be50f89e48f3..a4b12642043c76579d57a94baf959530b2b10222 100644 --- a/apps/consul_proxy/src/consul_proxy_middleware_hijack.erl +++ b/apps/consul_proxy/src/consul_proxy_middleware_hijack.erl @@ -1,6 +1,256 @@ -module(consul_proxy_middleware_hijack). + -behaviour(cowboyku_middleware). +-behaviour(ranch_protocol). +-behavior(gen_server). + +%% API +-export([start_link/1]). + +%% cowboyku_middleware callbacks -export([execute/2]). +%% ranch_protocol callbacks +-export([start_link/4, + init/4]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-include("consul_proxy.hrl"). + +-define(SERVER, ?MODULE). +-define(PBKDF2_SALT_LENGTH, 16). +-define(PBKDF2_ITERATIONS, 4096). +-define(PBKDF2_DERIVED_LENGTH, 32). + +-record(state, { + domain_tab = undefined +}). + +-record(socket_state, { + socket = undefined, + transport = undefined, + timeout = 3000, + client_state = undefined +}). + + +%%%=================================================================== +%%% API +%%%=================================================================== + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []). + +%%==================================================================== +%% cowboyku_middleware callbacks +%%==================================================================== + execute(Req, Env) -> - {ok, Req, Env}. + case gen_server:call(?SERVER, {request, Req}) of + {ignored, Req1} -> + {ok, Req1, Env}; + {hijacked, Req1} -> + receive + #{<<"status">> := Status, <<"headers">> := Headers, <<"body">> := Body} -> + {ok, Req2} = cowboyku_req:reply(Status, maps:to_list(Headers), Body, Req1), + {halt, Req2}; + _ -> + {ok, Req1, Env} + end + end. + +%%==================================================================== +%% ranch_protocol callbacks +%%==================================================================== + +start_link(Ref, Socket, Transport, Opts) -> + Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]), + {ok, Pid}. + +init(Ref, Socket, Transport, _Opts = []) -> + ok = ranch:accept_ack(Ref), + gen_server:cast(?SERVER, {socket_open, Socket, Transport}), + loop(#socket_state{socket = Socket, transport = Transport}). + +loop(State = #socket_state{socket = Socket, transport = Transport, client_state = unauthorized}) -> + lager:warning("~p", [State]), + Transport:close(Socket); + +loop(State = #socket_state{socket = Socket, transport = Transport, timeout = Timeout}) -> + case Transport:recv(Socket, 0, Timeout) of + {ok, Data} -> + case msgpack:unpack(Data) of + {ok, Terms} -> + loop(gen_server:call(?SERVER, {socket_recv, Terms, State})); + {error, Reason} -> + lager:error("Illegal msgpack data: ~p", [Reason]), + Transport:close(Socket) + end; + {error, closed} -> + ok; + {error, Reason} -> + lager:error("Error receiving data: ~p", [Reason]), + Transport:close(Socket) + end. + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +init(Args) -> + lager:info("Args: ~p", [Args]), + Tab = ets:new(hijacked_domains, [bag, {read_concurrency, true}]), + process_flag(trap_exit, true), + {ok, #state{domain_tab = Tab}}. + +handle_call({socket_recv, Terms = #{<<"action">> := <<"login">>}, SocketState}, _From, State) -> + #{<<"username">> := Username, <<"password">> := Password} = Terms, + case maps:get(Username, users(), undefined) of + undefined -> + {reply, SocketState#socket_state{client_state = unauthorized}, State}; + {Salt, Hash} -> + case pbkdf2:pbkdf2(sha512, Password, Salt, ?PBKDF2_ITERATIONS, ?PBKDF2_DERIVED_LENGTH) of + {ok, Hash} -> + send(SocketState, #{<<"action">> => <<"authorized">>, <<"username">> => Username}), + {reply, SocketState#socket_state{client_state = authorized, timeout = 60000}, State}; + _ -> + {reply, SocketState#socket_state{client_state = unauthorized}, State} + end + end; + +handle_call({socket_recv, Terms = #{<<"action">> := <<"bind">>}, SocketState}, _From, State) -> + #{<<"host">> := Host, <<"path">> := Path} = Terms, + #socket_state{socket = Socket, transport = Transport} = SocketState, + case re:compile(Path) of + {ok, MP} -> + ets:insert(State#state.domain_tab, {Host, MP, Socket, Transport}), + send(SocketState, Terms), + {reply, SocketState#socket_state{client_state = bound}, State}; + {error, {Description, Position}} -> + Message = io_lib:format("Bad path regexp '~s', ~s at position ~p", [Path, Description, Position]), + send(SocketState, #{<<"action">> => <<"error">>, <<"message">> => lists:flatten(Message)}), + {reply, SocketState, State} + end; + +handle_call({socket_recv, #{<<"action">> := <<"ping">>}, SocketState}, _From, State) -> + send(SocketState, #{<<"action">> => <<"pong">>}), + {reply, SocketState, State}; + +handle_call({socket_recv, Response = #{<<"from">> := B64From}, SocketState}, _From, State) -> + {Pid, _Tag} = binary_to_term(base64:decode(B64From)), + Pid ! Response, + {reply, SocketState, State}; + +handle_call({request, Req}, From, State = #state{domain_tab = Tab}) -> + {Host, Req2} = cowboyku_req:host(Req), + Patterns = ets:lookup(Tab, Host), + {reply, hijack(Patterns, Req2, From), State}; + +handle_call(Request, _From, State) -> + lager:warning("Unhandled call: ~p", [Request]), + {reply, ok, State}. + +handle_cast({socket_open, Socket, Transport}, State) -> + lager:notice("~p socket opened: ~p", [Transport, Socket]), + link(Socket), + {noreply, State}; + +handle_cast({socket_close, Socket, Reason}, State = #state{domain_tab = Tab}) -> + ets:match_delete(Tab, {'_', '_', Socket, '_'}), + lager:notice("Socket closed(~p): ~p", [Reason, Socket]), + unlink(Socket), + {noreply, State}; + +handle_cast(Request, State) -> + lager:warning("Unhandled cast: ~p", [Request]), + {noreply, State}. + +handle_info({'EXIT', Socket, Reason}, State) when is_port(Socket) -> + gen_server:cast(?SERVER, {socket_close, Socket, Reason}), + {noreply, State}; + +handle_info(Info, State) -> + lager:warning("Unhandled message: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +users() -> + CacheKey = <<"hijackers">>, + case consul_proxy_utils:cache_get(CacheKey) of + undefined -> + case consul_proxy_utils:get_and_decode(<<"consul_proxy/hijackers">>) of + {ok, Data} -> + Users = parse_users(Data, #{}), + consul_proxy_utils:cache_add(CacheKey, Users), + Users; + _ -> + [] + end; + Value -> + Value + end. + +parse_users([], Parsed) -> + Parsed; + +parse_users([User | Users], Parsed) -> + [Username, Password] = binary:split(User, <<$:>>), + <<Salt:?PBKDF2_SALT_LENGTH/binary, Hash:?PBKDF2_DERIVED_LENGTH/binary>> = base64:decode(Password), + parse_users(Users, Parsed#{Username => {Salt, Hash}}). + +hijack([], Req, _From) -> + {ignored, Req}; + +hijack([{_, RE, Socket, Transport} | Patterns], Req, From) -> + {Path, Req1} = cowboyku_req:path(Req), + case re:run(Path, RE) of + nomatch -> + hijack(Patterns, Req1, From); + _ -> + B64Pid = base64:encode(term_to_binary(From)), + {Map, Req2} = request_to_map(Req1), + Transport:send(Socket, msgpack:pack(Map#{<<"from">> => B64Pid})), + {hijacked, Req2} + end. + +send(#socket_state{socket = Socket, transport = Transport}, Data) -> + send(Socket, Transport, Data). + +send(Socket, Transport, Data) -> + Transport:send(Socket, msgpack:pack(Data)). + +request_to_map(Req) -> + {Id, Req0} = cowboyku_req:meta(request_id, Req), + {Qs, Req1} = cowboyku_req:qs(Req0), + FullPath = case Qs of + <<>> -> + Req1#http_req.path; + _ -> + Path = Req1#http_req.path, + <<Path/binary, "?", Qs/binary>> + end, + {ok, Body, Req2} = cowboyku_req:body(infinity, Req1), + Map = #{ + <<"id">> => Id, + <<"host">> => Req2#http_req.host, + <<"method">> => Req2#http_req.method, + <<"path">> => FullPath, + <<"headers">> => maps:from_list(Req#http_req.headers), + <<"body">> => Body + }, + {Map, Req2}. diff --git a/apps/consul_proxy/src/consul_proxy_sup.erl b/apps/consul_proxy/src/consul_proxy_sup.erl index 8b7fb791a0226c67570ad7a86482cf836d6d9942..a65e96227bb572fa77a4f40bb805cc8c03ca2bbc 100644 --- a/apps/consul_proxy/src/consul_proxy_sup.erl +++ b/apps/consul_proxy/src/consul_proxy_sup.erl @@ -49,4 +49,18 @@ init(Args) -> start => {consul_proxy_loadbalancer, start_link, [Args]} }, - {ok, {SupFlags, [ConsulCacheSpec, ConsulClientSpec, ConsulLoadBalancerSpec]}}. + HijackServerSpec = #{ + id => hijack_server, + start => {consul_proxy_middleware_hijack, start_link, [Args]} + }, + + HijackListenerCount = application:get_env(consul_proxy, hijack_listeners, 100), + HijackRanchOptions = [ + {port, application:get_env(consul_proxy, hijack_port, 8083)} + ], + HijackListenerSpec = ranch:child_spec(hijack_listener, HijackListenerCount, + ranch_tcp, HijackRanchOptions, + consul_proxy_middleware_hijack, [] + ), + + {ok, {SupFlags, [ConsulCacheSpec, ConsulClientSpec, ConsulLoadBalancerSpec, HijackServerSpec, HijackListenerSpec]}}. diff --git a/config/sys.config b/config/sys.config index 31738aef01af506d8f79810c0d1838c867cae89a..b6984ca6babc13074b832686241109d7072a1447 100644 --- a/config/sys.config +++ b/config/sys.config @@ -45,6 +45,9 @@ {http_port, 8080}, {proxy_port, 8081}, {api_port, 8082}, + {api_listeners, 100}, + {hijack_port, 8083}, + {hijack_listeners, 100}, {consul_url, "http://127.0.0.1:8500"}, {cache_size, 1024}, {loglevel, info}, diff --git a/config/test-sys.config b/config/test-sys.config index b4516388a77f001fa97bfb92c3a6b47015c3ed02..1d192ff8a3733d37632edd5d6fcacf1bdcb021b2 100644 --- a/config/test-sys.config +++ b/config/test-sys.config @@ -40,6 +40,9 @@ {http_port, 8080}, {proxy_port, 8081}, {api_port, 8082}, + {api_listeners, 100}, + {hijack_port, 8083}, + {hijack_listeners, 100}, {consul_urls, "http://127.0.0.1:8500"}, {cache_size, 1024}, {loglevel, info}, diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000000000000000000000000000000000000..5ab9781755b16f98011b5fe0d9dd1e1cdaf4e1a0 --- /dev/null +++ b/rebar.lock @@ -0,0 +1,53 @@ +[{<<"cowboyku">>, + {git,"https://github.com/heroku/cowboyku.git", + {ref,"ca64d29abe15c1522ba77283f15de20677aa5889"}}, + 1}, + {<<"cowlib">>, + {git,"https://github.com/ninenines/cowlib.git", + {ref,"63298e8e160031a70efff86a1acde7e7db1fcda6"}}, + 1}, + {<<"erequest_id">>, + {git,"https://github.com/heroku/erequest_id.git", + {ref,"e24f250c119e14975eff038ac30b2f9f7d1ceefa"}}, + 1}, + {<<"goldrush">>, + {git,"git://github.com/DeadZen/goldrush.git", + {ref,"212299233c7e7eb63a97be2777e1c05ebaa58dbe"}}, + 1}, + {<<"jsx">>,{pkg,<<"jsx">>,<<"2.8.0">>},0}, + {<<"lager">>, + {git,"https://github.com/basho/lager.git", + {ref,"b2cb2735713e3021e0761623ff595d53a545438e"}}, + 0}, + {<<"lru">>,{pkg,<<"lru">>,<<"1.3.1">>},0}, + {<<"luerl">>, + {git,"https://github.com/rvirding/luerl.git", + {ref,"01fa43a1a5d68becfe07236a9a71505039474108"}}, + 0}, + {<<"midjan">>, + {git,"https://github.com/heroku/midjan.git", + {ref,"1499e99509a07d5d3a0fbfd412cac40c64ebce0d"}}, + 1}, + {<<"msgpack">>,{pkg,<<"msgpack">>,<<"0.4.0">>},0}, + {<<"pbkdf2">>,{pkg,<<"pbkdf2">>,<<"2.0.0">>},0}, + {<<"quickrand">>, + {git,"https://github.com/okeuday/quickrand.git", + {ref,"cf7ac11a820367496252243e49898480b25009f7"}}, + 2}, + {<<"ranch">>, + {git,"https://github.com/ninenines/ranch.git", + {ref,"741bfb73ca663394ecf7a73410506d66f01985a6"}}, + 1}, + {<<"ranch_proxy_protocol">>, + {git,"https://github.com/heroku/ranch_proxy_protocol.git", + {ref,"5110e6ca4bc25dfb11d4ecf8d003169b95bf8969"}}, + 1}, + {<<"tsuru">>,{pkg,<<"tsuru">>,<<"1.4.0">>},0}, + {<<"uuid">>, + {git,"https://github.com/okeuday/uuid.git", + {ref,"24325ae6309d03897735c3a320aab6f74b189ab7"}}, + 1}, + {<<"vegur">>, + {git,"https://github.com/heroku/vegur.git", + {ref,"612d2de22bc6931dcc2bb0ae9c3e3187c4d77ca1"}}, + 0}].