Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion lib/memcache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,28 @@ defmodule Memcache do
connection_options = Keyword.merge(@default_opts, connection_options)
|> Keyword.update!(:coder, &normalize_coder/1)
state = connection_options |> Keyword.take(extra_opts) |> Enum.into(%{})
{:ok, pid} = Connection.start_link(Keyword.drop(connection_options, extra_opts), options)

{:ok, pid} = connection_options
|> add_flags
|> Keyword.drop(extra_opts)
|> Connection.start_link(options)

state = Map.put(state, :connection, pid)
Registry.associate(pid, state)

{:ok, pid}
end

# When we're using a serializer/coder, then we need to let Connection know to
# set the serialization bit when writing keys. For Dalli compat.
defp add_flags(opts) do
flags = case opts[:coder] do
{Memcache.Coder.Raw, _} -> 0
_ -> 1
end
Keyword.put(opts, :flags, flags)
end

@doc """
Closes the connection to the memcached server.
"""
Expand Down
42 changes: 35 additions & 7 deletions lib/memcache/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ defmodule Memcache.Connection do
end

defp send_and_receive(%State{ sock: sock } = s, from, command, args, opts) do
packet = serialize(command, args)
flags = Keyword.get(s.opts, :flags, 0)
packet = serialize(command, args, 0, flags)
case :gen_tcp.send(sock, packet) do
:ok ->
s = enqueue_receiver(s, from)
Expand All @@ -255,7 +256,8 @@ defmodule Memcache.Connection do
end

defp send_and_receive_quiet(%State{ sock: sock } = s, from, commands) do
{ packet, commands, i } = Enum.reduce(commands, { [], [], 1 }, &accumulate_commands/2)
flags = Keyword.get(s.opts, :flags, 0)
{ packet, commands, i } = Enum.reduce(commands, { [], [], 1 }, &accumulate_commands(&1, &2, flags))
packet = [packet | serialize(:NOOP, [], i)]
case :gen_tcp.send(sock, packet) do
:ok ->
Expand All @@ -271,11 +273,11 @@ defmodule Memcache.Connection do
%{state| receiver_queue: receiver_queue}
end

defp accumulate_commands({ command, args }, { packet, commands, i }) do
{ [packet | serialize(command, args, i)], [{ i, command, args, %{cas: false} } | commands], i + 1 }
defp accumulate_commands({ command, args }, { packet, commands, i }, flags) do
{ [packet | serialize(command, args, i, flags)], [{ i, command, args, %{cas: false} } | commands], i + 1 }
end
defp accumulate_commands({ command, args, options }, { packet, commands, i }) do
{ [packet | serialize(command, args, i)], [{ i, command, args, %{cas: Keyword.get(options, :cas, false)}} | commands], i + 1 }
defp accumulate_commands({ command, args, options }, { packet, commands, i }, flags) do
{ [packet | serialize(command, args, i, flags)], [{ i, command, args, %{cas: Keyword.get(options, :cas, false)}} | commands], i + 1 }
end

defp get_backoff(s) do
Expand Down Expand Up @@ -352,7 +354,33 @@ defmodule Memcache.Connection do
end
end

defp serialize(command, args, opaque \\ 0) do
defp serialize(command, args), do: serialize(command, args, 0)

defp serialize(command, args, opaque) do
apply(Protocol, :to_binary, [command | [opaque | args]])
end

# For Dalli compatibility, we need to set the first bit of flags to 1 when
# using a coder (serializer) with the following commands. We've stored flags
# in our state and now just need to use it when serializing the command.
defp serialize(command, args, opaque, flags)
when command == :SET
when command == :SETQ
when command == :ADD
when command == :ADDQ
when command == :REPLACE
when command == :REPLACEQ do

# to_binary for the above commands can default up to three args: cas, expiry, flags.
# And since flags is the last arg, we have to account for that here.
args = case length(args) do
2 -> args ++ [0, 0, flags]
3 -> args ++ [0, flags]
4 -> args ++ [flags]
end

serialize(command, args, opaque)
end

defp serialize(command, args, opaque, _flags), do: serialize(command, args, opaque)
end