diff --git a/rebar.config b/rebar.config index 64281eb22c913ae299d9e8bfe97ac37602f76af4..cdf8d10cd675832f8ccc04256cf6756cb708977f 100644 --- a/rebar.config +++ b/rebar.config @@ -3,5 +3,6 @@ {deps, [ {getopt, "0.8.2"}, {msgpack, "0.4.0"}, + {ranch, "1.1.0"}, {gun, "1.0.0-pre.1"} ]}. diff --git a/src/hijack.erl b/src/hijack.erl index 232a7eb77ef3af6b365fd4b7168d1f264b41b99b..658003f97dc0555d4a3eeaa99a26ff0cd1618bfa 100644 --- a/src/hijack.erl +++ b/src/hijack.erl @@ -12,6 +12,7 @@ -record(state, { opts = [], + transport = ranch_ssl, consul_socket = undefined, target_host = "localhost", target_port = 80 @@ -29,37 +30,66 @@ main(Args) -> end. execute(State = #state{opts = Opts}) -> + Transport = ranch_ssl, {ConsulHost, ConsulPort} = parse_host_and_port(proplists:get_value(consul, Opts)), - {ok, Socket} = ssl:connect(ConsulHost, ConsulPort, []), - %%{ok, Socket} = gen_tcp:connect(ConsulHost, ConsulPort, [binary, {active, true}]), - {TargetHost, TargetPort} = parse_host_and_port(proplists:get_value(target, Opts)), - NewState = State#state{consul_socket = Socket, target_host = TargetHost, target_port = TargetPort}, - process_flag(trap_exit, true), - link(Socket), - loop(send_to_consul(NewState, #{ - <<"action">> => <<"login">>, - <<"username">> => list_to_binary(proplists:get_value(username, Opts, "")), - <<"password">> => list_to_binary(proplists:get_value(password, Opts, "")) - })). + case Transport:connect(ConsulHost, ConsulPort, []) of + {ok, Socket} -> + {TargetHost, TargetPort} = parse_host_and_port(proplists:get_value(target, Opts)), + NewState = State#state{transport = Transport, consul_socket = Socket, target_host = TargetHost, target_port = TargetPort}, + Parent = self(), + spawn( + fun() -> + transport_loop(Parent, Transport, Socket) + end), + loop(send_to_consul(NewState, #{ + <<"action">> => <<"login">>, + <<"username">> => list_to_binary(proplists:get_value(username, Opts, "")), + <<"password">> => list_to_binary(proplists:get_value(password, Opts, "")) + })); + {error, Reason} -> + io:format("Error connecting to ~s:~p: ~p~n", [ConsulHost, ConsulPort, Reason]), + ok + end. parse_host_and_port(HostAndPort) -> [Host, Port] = string:tokens(HostAndPort, ":"), {Host, list_to_integer(Port)}. +transport_loop(Parent, Transport, Socket) -> + case Transport:recv(Socket, 0, 3000) of + {ok, Data} -> + case msgpack:unpack(Data) of + {ok, Terms} -> + Parent ! {socket_recv, Terms}, + transport_loop(Parent, Transport, Socket); + {error, _Reason} -> + Transport:close(Socket) + end; + {error, timeout} -> + Parent ! socket_timeout, + transport_loop(Parent, Transport, Socket); + {error, closed} -> + Parent ! socket_closed; + {error, Reason} -> + Transport:close(Socket), + Parent ! {socket_error, Reason} + end. + loop(#state{consul_socket = undefined}) -> ok; -loop(State = #state{consul_socket = Socket}) -> +loop(State) -> receive - {tcp_closed, Socket} -> + {socket_recv, Terms} -> + loop(handle(State, Terms)); + {socket_error, _Reason} -> + ok; + socket_timeout -> + loop(send_to_consul(State, #{ + <<"action">> => <<"ping">> + })); + socket_closed -> ok; - {tcp, Socket, Data} -> - case msgpack:unpack(Data) of - {ok, Terms} -> - loop(handle(State, Terms)); - {error, _Reason} -> - gen_tcp:close(Socket) - end; {response, From, Status, Headers, Body} -> loop(send_to_consul(State, #{ <<"from">> => From, @@ -80,15 +110,10 @@ loop(State = #state{consul_socket = Socket}) -> Msg -> io:format("loop recv ~p~n", [Msg]), loop(State) - after - 3000 -> - loop(send_to_consul(State, #{ - <<"action">> => <<"ping">> - })) end. -send_to_consul(State = #state{consul_socket = Socket}, Map) -> - case gen_tcp:send(Socket, msgpack:pack(Map)) of +send_to_consul(State = #state{transport = Transport, consul_socket = Socket}, Map) -> + case Transport:send(Socket, msgpack:pack(Map)) of ok -> State; {error, Reason} -> @@ -109,9 +134,8 @@ handle(State, #{<<"action">> := <<"bind">>, <<"host">> := Host, <<"path">> := Pa io:format("Listening to ~s/~s~n", [Host, Path]), State; -handle(State = #state{consul_socket = Socket}, #{<<"action">> := <<"error">>, <<"message">> := Message}) -> - gen_tcp:close(Socket), - io:format("~s~n", [Message]), +handle(State = #state{transport = Transport, consul_socket = Socket}, #{<<"action">> := <<"error">>, <<"message">> := Message}) -> + Transport:close(Socket), State#state{consul_socket = undefined}; handle(State = #state{target_host = Host, target_port = Port}, Request = #{<<"from">> := _From}) -> @@ -134,7 +158,6 @@ handle(State, Terms) -> gun_loop(Parent, Request = #{<<"from">> := From}, Pid, StreamRef) -> receive {gun_up, Pid, _Protocol} -> - Parent ! {gun_up, Pid}, gun_loop(Parent, Request, Pid, request(Pid, Request)); {gun_down, Pid, _Protocol, _Reason, _, _} -> Parent ! {gun_down, Pid}; @@ -154,18 +177,20 @@ gun_loop_data(Parent, Request = #{<<"from">> := From}, Pid, StreamRef, Status, H Parent ! {gun_down, Pid}; {gun_error, Pid, StreamRef, Reason} -> Parent ! {gun_error, Pid, From, Reason}; - {gun_data, Pid, StreamRef, nofin, Data} -> - gun_loop_data(Parent, Request, Pid, StreamRef, Status, Headers, <<Buffer/binary, Data/binary>>); {gun_data, Pid, StreamRef, fin, Data} -> - Parent ! {response, From, Status, Headers, <<Buffer/binary, Data/binary>>} + Parent ! {response, From, Status, Headers, <<Buffer/binary, Data/binary>>}; + {gun_data, Pid, StreamRef, nofin, Data} -> + gun_loop_data(Parent, Request, Pid, StreamRef, Status, Headers, <<Buffer/binary, Data/binary>>) after 3000 -> Parent ! {gun_timeout, Pid, From} end. -request(Pid, Request = #{<<"method">> := <<"GET">>}) -> +request(Pid, Request = #{<<"method">> := Method}) + when Method == <<"GET">>; Method == <<"HEAD">>;Method == <<"OPTIONS">>; Method == <<"DELETE">> -> #{<<"path">> := Path, <<"headers">> := Headers} = Request, - gun:get(Pid, Path, maps:to_list(Headers)); + gun:request(Pid, Method, Path, maps:to_list(Headers)); -request(Pid, Request = #{<<"method">> := <<"POST">>}) -> +request(Pid, Request = #{<<"method">> := Method}) + when Method == <<"POST">>; Method == <<"PUT">>;Method == <<"PATCH">> -> #{<<"path">> := Path, <<"headers">> := Headers, <<"body">> := Body} = Request, - gun:post(Pid, Path, maps:to_list(Headers), Body). + gun:request(Pid, Method, Path, maps:to_list(Headers), Body).