diff --git a/README.md b/README.md index 8bcb500..db113e7 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,10 @@ Table of Contents * [get_primary_peers](#get_primary_peers) * [get_backup_peers](#get_backup_peers) * [set_peer_down](#set_peer_down) + * [add_server](#add_server) + * [add_peer](#add_peer) + * [remove_server](#remove_server) + * [remove_peer](#remove_peer) * [TODO](#todo) * [Compatibility](#compatibility) * [Installation](#installation) @@ -205,23 +209,87 @@ You can turn on a peer again by providing a `false` value as the 4th argument. [Back to TOC](#table-of-contents) +add_server +----------- +`syntax: ok,err = upstream.add_server(upstream_name,ip:port,weight,max_fails,fail_timeout,is_backup)` + +Add a server to upstream. if the server is exist will return err and notes the server is exist. +Note that this method only add a server in the current Nginx worker process. +You need to synchronize the changes across all the Nginx workers if you want a server-wide change. +Please useing [ngx_lua_upstream_dyusc]() + + Warning: + `it also to add server to ngx_http_upstream_server_t structure ,so you should call add_peer. +[Back to TOC](#table-of-contents) + +add_peer +----------- +`syntax: ok,err = upstream.add_peer(upstream,ip:port)` + +Add a server to back-end peers. if back-end peers is exist will return err and notes the peer is exist. +it's suitable for ip_hash or round_robin hash and least_conn. +Note that this method only add a peer in the current Nginx worker process. +You need to synchronize the changes across all the Nginx workers if you want a server-wide change. +Please useing [ngx_lua_upstream_dyusc]() + Warning: + `if you are using a least_conn and you should update something to below` + + ```nginx + Modified macro variable 'NGX_HTTP_UPSTREAM_LEAST_CONN' 1 ,it's default 0 at 'lua-upstream-nginx-module/src/ngx_http_lua_upstream_module.h' file. + ``` + +[Back to TOC](#table-of-contents) + +remove_server +----------- +`syntax: ok,err = upstream.remove_server(upstream,ip:port)` + +Remove a server from upstream. if the server is not exist will return err and notes the server is not found. +Note that this method only remove a server in the current Nginx worker process. +You need to synchronize the changes across all the Nginx workers if you want a server-wide change. +Please useing [ngx_lua_upstream_dyusc]() + Warning: + `it also to add server to ngx_http_upstream_server_t structure ,so you should call add_peer. + + +[Back to TOC](#table-of-contents) + +remove_peer +----------- +`syntax: ok,err = upstream.remove_peer(upstream,ip:port)` + +Remove a server to back-end peers. if back-end peers not exist will return err and notes the peer is not found. +it's suitable for ip_hash or round_robin hash and least_conn. +Note that this method only remove a peer in the current Nginx worker process. +You need to synchronize the changes across all the Nginx workers if you want a server-wide change. +Please useing [ngx_lua_upstream_dyusc]() + Warning: + `if you are using a least_conn and you should update something to below` + + ```nginx + Modified macro variable 'NGX_HTTP_UPSTREAM_LEAST_CONN' 1 ,it's default 0 at 'lua-upstream-nginx-module/src/ngx_http_lua_upstream_module.h' file. + ``` + +[Back to TOC](#table-of-contents) + + + + TODO ==== -* Add API to add or remove servers to existing upstream groups. [Back to TOC](#table-of-contents) Compatibility ============= -The following versions of Nginx should work with this module: +The following versions of OpenResty should work with this module: -* **1.9.x** (last tested: 1.9.2) -* **1.8.x** -* **1.7.x** (last tested: 1.7.10) -* **1.6.x** -* **1.5.x** (last tested: 1.5.12) +* **1.7.10.x** (last tested: 1.7.10.2) +* **1.7.7.x** (last tested: 1.7.7.2) +* **1.7.4.1** +* **1.7.2.1** [Back to TOC](#table-of-contents) diff --git a/src/ddebug.h b/src/ddebug.h index dc18430..466bfd1 100644 --- a/src/ddebug.h +++ b/src/ddebug.h @@ -7,7 +7,6 @@ #ifndef _DDEBUG_H_INCLUDED_ #define _DDEBUG_H_INCLUDED_ - #include #include #include diff --git a/src/ngx_http_lua_upstream_module.c b/src/ngx_http_lua_upstream_module.c index 0d729f5..caa2afc 100644 --- a/src/ngx_http_lua_upstream_module.c +++ b/src/ngx_http_lua_upstream_module.c @@ -14,94 +14,1364 @@ #include #include #include "ngx_http_lua_api.h" +#include "ngx_http_lua_upstream_module.h" +#define NGX_DELAY_DELETE 100 * 1000 + ngx_module_t ngx_http_lua_upstream_module; -static ngx_int_t ngx_http_lua_upstream_init(ngx_conf_t *cf); -static int ngx_http_lua_upstream_create_module(lua_State * L); -static int ngx_http_lua_upstream_get_upstreams(lua_State * L); -static int ngx_http_lua_upstream_get_servers(lua_State * L); -static ngx_http_upstream_main_conf_t * - ngx_http_lua_upstream_get_upstream_main_conf(lua_State *L); -static int ngx_http_lua_upstream_get_primary_peers(lua_State * L); -static int ngx_http_lua_upstream_get_backup_peers(lua_State * L); -static int ngx_http_lua_get_peer(lua_State *L, - ngx_http_upstream_rr_peer_t *peer, ngx_uint_t id); -static ngx_http_upstream_srv_conf_t * - ngx_http_lua_upstream_find_upstream(lua_State *L, ngx_str_t *host); -static ngx_http_upstream_rr_peer_t * - ngx_http_lua_upstream_lookup_peer(lua_State *L); -static int ngx_http_lua_upstream_set_peer_down(lua_State * L); +static ngx_int_t ngx_http_lua_upstream_init(ngx_conf_t *cf); +static int ngx_http_lua_upstream_create_module(lua_State * L); +static int ngx_http_lua_upstream_get_upstreams(lua_State * L); +static int ngx_http_lua_upstream_get_servers(lua_State * L); +static ngx_http_upstream_main_conf_t * + ngx_http_lua_upstream_get_upstream_main_conf(lua_State *L); +static int ngx_http_lua_upstream_get_primary_peers(lua_State * L); +static int ngx_http_lua_upstream_get_backup_peers(lua_State * L); +static int ngx_http_lua_get_peer(lua_State *L, + ngx_http_upstream_rr_peer_t *peer, ngx_uint_t id); +static ngx_http_upstream_srv_conf_t * + ngx_http_lua_upstream_find_upstream(lua_State *L, ngx_str_t *host); +static ngx_http_upstream_rr_peer_t * + ngx_http_lua_upstream_lookup_peer(lua_State *L); +static int ngx_http_lua_upstream_set_peer_down(lua_State * L); +static int ngx_http_lua_upstream_set_peer_weight(lua_State * L); +static int ngx_http_lua_upstream_set_peer_max_fails(lua_State * L); +static int ngx_http_lua_upstream_set_peer_fail_timeout(lua_State * L); +static int ngx_http_lua_upstream_add_server(lua_State * L); +static ngx_http_upstream_server_t* + ngx_http_lua_upstream_compare_server(ngx_http_upstream_srv_conf_t * us, ngx_url_t u); +static ngx_http_upstream_srv_conf_t * + ngx_http_lua_upstream_check_peers(lua_State * L, ngx_url_t u, ngx_http_upstream_server_t ** srv); +static int + ngx_http_lua_upstream_exist_peer(ngx_http_upstream_rr_peers_t * peers, ngx_url_t u); +static int ngx_http_lua_upstream_add_peer(lua_State * L); +static int ngx_http_lua_upstream_remove_server(lua_State * L); +static int ngx_http_lua_upstream_remove_peer(lua_State * L); +static void ngx_http_lua_upstream_event_init(void *peers); +static void ngx_http_lua_upstream_add_delay_delete(ngx_event_t *event); +static ngx_int_t ngx_pfree_and_delay(ngx_pool_t *pool, void *p); +static void * ngx_prealloc(ngx_pool_t *pool, void *p, size_t old_size, size_t new_size, ngx_uint_t flag); + +#if (NGX_HTTP_UPSTREAM_CONSISTENT_HASH) + +static int +ngx_http_upstream_chash_cmp_points(const void *one, const void *two); +static void +ngx_http_upstream_consistent_hash(ngx_http_upstream_rr_peer_t *peer, ngx_http_upstream_chash_points_t *points); +static int +ngx_http_lua_upstream_consistent_hash_init(lua_State * L, ngx_http_upstream_srv_conf_t *uscf); +static int +ngx_http_lua_upstream_remove_peer_chash(lua_State * L, ngx_http_upstream_srv_conf_t *us); + +#endif + +#if (NGX_HTTP_UPSTREAM_LEAST_CONN) + +static int +ngx_http_lua_upstream_least_conn_init(lua_State * L, ngx_http_upstream_srv_conf_t *uscf, ngx_uint_t flag); +static int +ngx_http_lua_upstream_remover_peer_least_conn(lua_State * L, ngx_http_upstream_srv_conf_t *us, ngx_uint_t pos); + +#endif + +static ngx_http_module_t ngx_http_lua_upstream_ctx = { + NULL, /* preconfiguration */ + ngx_http_lua_upstream_init, /* postconfiguration */ + NULL, /* create main configuration */ + NULL, /* init main configuration */ + NULL, /* create server configuration */ + NULL, /* merge server configuration */ + NULL, /* create location configuration */ + NULL /* merge location configuration */ +}; + + +ngx_module_t ngx_http_lua_upstream_module = { + NGX_MODULE_V1, + &ngx_http_lua_upstream_ctx, /* module context */ + NULL, /* module directives */ + NGX_HTTP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +typedef struct { + ngx_event_t delay_delete_ev; + + time_t start_sec; + ngx_msec_t start_msec; + + void *data; +} ngx_delay_event_t; + + +static void +ngx_http_lua_upstream_event_init(void *peers) +{ + ngx_time_t *tp; + ngx_delay_event_t *delay_event; + + + delay_event = ngx_calloc(sizeof(*delay_event), ngx_cycle->log); + if (delay_event == NULL) { + ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, + "http_lua_upstream_event_init: calloc failed"); + return; + } + + tp = ngx_timeofday(); + delay_event->start_sec = tp->sec; + delay_event->start_msec = tp->msec; + + delay_event->delay_delete_ev.handler = ngx_http_lua_upstream_add_delay_delete; + delay_event->delay_delete_ev.log = ngx_cycle->log; + delay_event->delay_delete_ev.data = delay_event; + delay_event->delay_delete_ev.timer_set = 0; + + + delay_event->data = peers; + ngx_add_timer(&delay_event->delay_delete_ev, NGX_DELAY_DELETE); + + return; +} + + +static void +ngx_http_lua_upstream_add_delay_delete(ngx_event_t *event) +{ + ngx_uint_t i; + ngx_connection_t *c; + ngx_delay_event_t *delay_event; + ngx_http_request_t *r=NULL; + ngx_http_log_ctx_t *ctx=NULL; + void *peers=NULL; + + delay_event = event->data; + + c = ngx_cycle->connections; + for (i = 0; i < ngx_cycle->connection_n; i++) { + + if (c[i].fd == (ngx_socket_t) - 1) { + continue; + } else { + + if (c[i].log->data != NULL) { + ctx = c[i].log->data; + r = ctx->current_request; + } + } + + if (r) { + if (r->start_sec < delay_event->start_sec) { + ngx_add_timer(&delay_event->delay_delete_ev, NGX_DELAY_DELETE); + return; + } + + if (r->start_sec == delay_event->start_sec) { + + if (r->start_msec <= delay_event->start_msec) { + ngx_add_timer(&delay_event->delay_delete_ev, NGX_DELAY_DELETE); + return; + } + } + } + } + + peers = delay_event->data; + + if (peers != NULL) { + + ngx_free(peers); + peers = NULL; + } + + + ngx_free(delay_event); + + delay_event = NULL; + + return; +} + + +static ngx_int_t +ngx_http_lua_upstream_init(ngx_conf_t *cf) +{ + if (ngx_http_lua_add_package_preload(cf, "ngx.upstream", + ngx_http_lua_upstream_create_module) + != NGX_OK) + { + return NGX_ERROR; + } + + return NGX_OK; +} + + +static int +ngx_http_lua_upstream_create_module(lua_State * L) +{ + lua_createtable(L, 0, 1); + + lua_pushcfunction(L, ngx_http_lua_upstream_get_upstreams); + lua_setfield(L, -2, "get_upstreams"); + + lua_pushcfunction(L, ngx_http_lua_upstream_get_servers); + lua_setfield(L, -2, "get_servers"); + + lua_pushcfunction(L, ngx_http_lua_upstream_get_primary_peers); + lua_setfield(L, -2, "get_primary_peers"); + + lua_pushcfunction(L, ngx_http_lua_upstream_get_backup_peers); + lua_setfield(L, -2, "get_backup_peers"); + + lua_pushcfunction(L, ngx_http_lua_upstream_set_peer_down); + lua_setfield(L, -2, "set_peer_down"); + + lua_pushcfunction(L, ngx_http_lua_upstream_set_peer_weight); + lua_setfield(L, -2, "set_peer_weight"); + + lua_pushcfunction(L, ngx_http_lua_upstream_set_peer_max_fails); + lua_setfield(L, -2, "set_peer_max_fails"); + + lua_pushcfunction(L, ngx_http_lua_upstream_set_peer_fail_timeout); + lua_setfield(L, -2, "set_peer_fail_timeout"); + + lua_pushcfunction(L, ngx_http_lua_upstream_add_server); + lua_setfield(L, -2, "add_server"); + + lua_pushcfunction(L, ngx_http_lua_upstream_add_peer); + lua_setfield(L, -2, "add_peer"); + + lua_pushcfunction(L, ngx_http_lua_upstream_remove_server); + lua_setfield(L, -2, "remove_server"); + + lua_pushcfunction(L, ngx_http_lua_upstream_remove_peer); + lua_setfield(L, -2, "remove_peer"); + + return 1; +} + + +/* + * The function is compare server as upstream server + * if exists and return upstream_server_t else return + * NULL. + * +*/ +static ngx_http_upstream_server_t* +ngx_http_lua_upstream_compare_server(ngx_http_upstream_srv_conf_t * us, ngx_url_t u ) +{ + ngx_uint_t i, j; + size_t len; + ngx_http_upstream_server_t *server = NULL; + + if (us->servers == NULL || us->servers->nelts == 0) { + return NULL; + } + + server = us->servers->elts; + + for (i = 0; i < us->servers->nelts; ++i) { + for(j = 0; j < server[i].naddrs; ++j) { + + len = server[i].addrs[j].name.len; + if (len == u.url.len + && ngx_memcmp(u.url.data, server[i].addrs[j].name.data, u.url.len) == 0) { + + return &server[i]; + } + } + } + + return NULL; +} + + +/* + * the function is set the specified server weight + * +*/ +static int +ngx_http_lua_upstream_set_peer_weight(lua_State * L) +{ + ngx_http_upstream_rr_peer_t *peer; + ngx_str_t host; + ngx_http_upstream_srv_conf_t *us; + ngx_http_upstream_rr_peers_t *peers; + ngx_int_t diff_weight; + ngx_int_t new_weight; + + if (lua_gettop(L) != 4) { + return luaL_error(L, "exactly 4 arguments expected"); + } + + peer = ngx_http_lua_upstream_lookup_peer(L); + if (peer == NULL) { + return 2; + } + + new_weight = (ngx_int_t) luaL_checkint(L, 4); + if (new_weight < 1){ + lua_pushnil(L); + lua_pushliteral(L, "must be greater than 0"); + return 2; + } + + + diff_weight = new_weight - peer->weight; + peer->weight = new_weight; + peer->effective_weight = new_weight; + peer->current_weight += diff_weight; + + // find upstream, in order to update weighted & total_weight + host.data = (u_char *) luaL_checklstring(L, 1, &host.len); + us = ngx_http_lua_upstream_find_upstream(L, &host); + if (us == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "upstream not found\n"); + return 2; + } + + peers = us->peer.data; + peers->total_weight += diff_weight; + peers->weighted = (peers->total_weight == peers->number); + + return 1; +} + + +/* + * the function is set the specified server max_fails + * +*/ +static int +ngx_http_lua_upstream_set_peer_max_fails(lua_State * L) +{ + ngx_http_upstream_rr_peer_t *peer; + ngx_uint_t new_max_fails; + + if (lua_gettop(L) != 4) { + return luaL_error(L, "exactly 4 arguments expected"); + } + + peer = ngx_http_lua_upstream_lookup_peer(L); + if (peer == NULL) { + return 2; + } + + new_max_fails = (ngx_uint_t) luaL_checkint(L, 4); + if (new_max_fails < 1) { + lua_pushnil(L); + lua_pushliteral(L, "must be greater than 0"); + return 2; + } + + + peer->max_fails = new_max_fails; + + return 1; +} + + +/* + * the function is set the specified server fail_timeout + * +*/ +static int +ngx_http_lua_upstream_set_peer_fail_timeout(lua_State * L) +{ + ngx_http_upstream_rr_peer_t *peer; + time_t new_fail_timeout; + + if (lua_gettop(L) != 4) { + return luaL_error(L, "exactly 4 arguments expected"); + } + + peer = ngx_http_lua_upstream_lookup_peer(L); + if (peer == NULL) { + return 2; + } + + new_fail_timeout = (time_t) luaL_checkint(L, 4); + if (new_fail_timeout < 1){ + lua_pushnil(L); + lua_pushliteral(L, "must be greater than 0"); + return 2; + } + + + peer->fail_timeout = new_fail_timeout; + + return 1; +} + + +/* + * The function is dynamically add a server to upstream + * server ,but not added it to the back-end peer. + * +*/ +static int +ngx_http_lua_upstream_add_server(lua_State * L) +{ + + ngx_str_t host; + ngx_http_upstream_server_t *us; + ngx_http_upstream_srv_conf_t *uscf; + ngx_url_t u; + ngx_http_request_t *r; + ngx_int_t weight, max_fails; + ngx_uint_t backup; + time_t fail_timeout; + u_char *p; + + if (lua_gettop(L) != 6) { + // six param is :"upstream name", "ip:port" , "weight" , "max_fails", + //"fail_time", "backup" + // for lua code , you must pass this five param, is none ,you should + // consider pass default value. + return luaL_error(L, "exactly six argument expected"); + } + + r = ngx_http_lua_get_request(L); + if (r == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "get request error \n"); + return 2; + } + + host.data = (u_char *) luaL_checklstring(L, 1, &host.len); + + ngx_memzero(&u, sizeof (ngx_url_t)); + p = (u_char *) luaL_checklstring(L, 2, &u.url.len); + u.default_port = 80; + + weight = (ngx_int_t) luaL_checkint(L, 3); + max_fails = (ngx_int_t) luaL_checkint(L, 4); + fail_timeout = (time_t) luaL_checklong(L, 5); + backup = lua_toboolean(L, 6); + +#if (NGX_DEBUG) + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "%s %s params: %s,%s,%d,%d,%d\n", __FILE__,__FUNCTION__, host.data, p, weight, max_fails, fail_timeout); +#endif + + uscf = ngx_http_lua_upstream_find_upstream(L, &host); + if (uscf == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "upstream not found\n"); + return 2; + } + + // lua virtual machine memory is stack,so dup a memory + u.url.data = ngx_pcalloc(uscf->servers->pool, u.url.len+1); + ngx_memcpy(u.url.data, p, u.url.len); + + if (ngx_http_lua_upstream_compare_server(uscf, u) != NULL) { + lua_pushnil(L); + lua_pushliteral(L,"this server is exist\n"); + return 2; + } + + if (uscf->servers == NULL || uscf->servers->nelts == 0) { + lua_pushliteral(L, "upstream has no server before!\n"); + lua_newtable(L); + return 2; + + } else { + if (ngx_parse_url(uscf->servers->pool, &u) != NGX_OK) { + if (u.err) { + lua_pushnil(L); + lua_pushliteral(L, "url parser error"); + return 2; + } + } + + us = ngx_array_push(uscf->servers); + if (us == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "us push uscf->servers failed\n"); + return 2; + } + + ngx_memzero(us, sizeof (ngx_http_upstream_server_t)); + + us->name = u.url; + us->addrs = u.addrs; + us->naddrs = u.naddrs; + us->weight = weight; + us->max_fails = max_fails; + us->fail_timeout = fail_timeout; + us->backup = backup; + } + + lua_pushboolean(L, 1); + return 1; +} + + +/* + * The function is remove a server from ngx_http_upstream_srv_conf_t servers. + * +*/ +static int +ngx_http_lua_upstream_remove_server(lua_State * L) +{ + ngx_uint_t i, j, k; + size_t len; + ngx_str_t host; + ngx_array_t *servers; + ngx_http_upstream_server_t *us, *server; + ngx_http_upstream_srv_conf_t *uscf; + ngx_http_request_t *r; + ngx_url_t u; + size_t old_size, new_size; + + if (lua_gettop(L) != 2) { + // two param is : "bar","ip:port" + // for lua code , you must pass this two param, is none ,you should + // consider pass default value. + lua_pushnil(L); + lua_pushliteral(L, "exactly two argument expected\n"); + return 2; + } + + r = ngx_http_lua_get_request(L); + if (r == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "get request error \n"); + return 2; + } + + host.data = (u_char *) luaL_checklstring(L, 1, &host.len); + + ngx_memzero(&u, sizeof (ngx_url_t)); + u.url.data = (u_char *) luaL_checklstring(L, 2, &u.url.len); + u.default_port = 80; + +#if (NGX_DEBUG) + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "%s %s params: %s\n", __FILE__,__FUNCTION__ , u.url.data ); +#endif + + uscf = ngx_http_lua_upstream_find_upstream(L, &host); + if (uscf == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "upstream not found\n"); + return 2; + } + + if (ngx_http_lua_upstream_compare_server(uscf, u) == NULL) { + lua_pushnil(L); + lua_pushliteral(L,"not found this server\n"); + return 2; + } + + if (uscf->servers->nelts == 1) { + lua_pushnil(L); + lua_pushliteral(L, "upstream last one is not allowed to delete\n"); + return 2; + } + + server = uscf->servers->elts; + + servers = ngx_array_create(ngx_cycle->pool, uscf->servers->nelts, sizeof(ngx_http_upstream_server_t)); + if (servers == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "servers create fail\n"); + return 2; + } + + for (i = 0; i < uscf->servers->nelts; i++) { + if (server[i].naddrs == 1) { + + len = server[i].addrs->name.len; + if (len == u.url.len + && ngx_memcmp(u.url.data, server[i].addrs->name.data, u.url.len) == 0) { + continue; + } + + } else { + for (j = 0; j < server[i].naddrs; ++j) { + + len = server[i].addrs[j].name.len; + if (len == u.url.len + && ngx_memcmp(u.url.data, server[i].addrs[j].name.data, u.url.len) == 0) { + for (k = j; k < server[i].naddrs -1; ++k) { + server[i].addrs[k] = server[i].addrs[k+1]; + } + + old_size = server[i].naddrs * sizeof(ngx_addr_t); + new_size = (server[i].naddrs - 1) * sizeof(ngx_addr_t); + + server[i].addrs = ngx_prealloc(ngx_cycle->pool, server[i].addrs, old_size, new_size, 0); + server[i].naddrs -= 1; + break; + } + } + } + + us = ngx_array_push(servers); + ngx_memzero(us, sizeof(ngx_http_upstream_server_t)); + us->name = server[i].name; + us->addrs = server[i].addrs; + us->naddrs = server[i].naddrs; + us->weight = server[i].weight; + us->max_fails = server[i].max_fails; + us->fail_timeout = server[i].fail_timeout; + us->backup = server[i].backup; + us->down = server[i].down; + } + + ngx_array_destroy(uscf->servers); + uscf->servers = servers; + + lua_pushboolean(L, 1); + return 1; +} + + +#if (NGX_HTTP_UPSTREAM_CONSISTENT_HASH) + +static int +ngx_http_upstream_chash_cmp_points(const void *one, const void *two) +{ + ngx_http_upstream_chash_point_t *first = + (ngx_http_upstream_chash_point_t *) one; + ngx_http_upstream_chash_point_t *second = + (ngx_http_upstream_chash_point_t *) two; + + if (first->hash < second->hash) { + return -1; + + } else if (first->hash > second->hash) { + return 1; + + } else { + return 0; + } +} + +static void +ngx_http_upstream_consistent_hash(ngx_http_upstream_rr_peer_t *peer, + ngx_http_upstream_chash_points_t *points) +{ + ngx_str_t *server; + u_char *host, *port, c; + ngx_uint_t npoints, j; + uint32_t hash, base_hash; + size_t host_len, port_len; + union { + uint32_t value; + u_char byte[4]; + } prev_hash; + + + server = &peer->server; + if (server->len >= 5 + && ngx_strncasecmp(server->data, (u_char *) "unix:", 5) == 0) + { + host = server->data + 5; + host_len = server->len - 5; + port = NULL; + port_len = 0; + goto done; + } + + for (j = 0; j < server->len; j++) { + c = server->data[server->len - j - 1]; + + if (c == ':') { + host = server->data; + host_len = server->len - j - 1; + port = server->data + server->len - j; + port_len = j; + goto done; + } + + if (c < '0' || c > '9') { + break; + } + } + + host = server->data; + host_len = server->len; + port = NULL; + port_len = 0; + + done: + + ngx_crc32_init(base_hash); + ngx_crc32_update(&base_hash, host, host_len); + ngx_crc32_update(&base_hash, (u_char *) "", 1); + ngx_crc32_update(&base_hash, port, port_len); + + prev_hash.value = 0; + npoints = peer->weight * 160; + + for(j = 0; j < npoints; j++) { + hash = base_hash; + + ngx_crc32_update(&hash, prev_hash.byte, 4); + ngx_crc32_final(hash); + + points->point[points->number].hash = hash; + points->point[points->number].server = server; + points->number++; +#if (NGX_HAVE_LITTLE_ENDIAN) + prev_hash.value = hash; +#else + prev_hash.byte[0] = (u_char) (hash & 0xff); + prev_hash.byte[1] = (u_char) ((hash >> 8) & 0xff); + prev_hash.byte[2] = (u_char) ((hash >> 16) & 0xff); + prev_hash.byte[3] = (u_char) ((hash >> 24) & 0xff); +#endif + } +} + + +static int +ngx_http_lua_upstream_consistent_hash_init(lua_State * L, + ngx_http_upstream_srv_conf_t *uscf) +{ + ngx_uint_t npoints, i, j; + ngx_http_upstream_chash_points_t *points; + ngx_http_upstream_hash_srv_conf_t *hcf; + size_t old_size, new_size; + ngx_http_upstream_rr_peer_t *peer; + ngx_http_upstream_rr_peers_t *peers; + + hcf = ngx_http_conf_upstream_srv_conf(uscf, ngx_http_upstream_hash_module); + if(hcf->points == NULL) { + return 0; + } + + peers = uscf->peer.data; + npoints = (peers->total_weight - peers->peer[peers->number - 1].weight) * 160; + old_size = sizeof(ngx_http_upstream_chash_points_t) + + sizeof(ngx_http_upstream_chash_point_t) * (npoints - 1); + new_size = old_size + + sizeof(ngx_http_upstream_chash_point_t) * peers->peer[peers->number - 1].weight * 160; + + points = ngx_prealloc(ngx_cycle->pool, hcf->points, old_size, new_size, 0); + if (points == NULL ) { + lua_pushnil(L); + lua_pushliteral(L, "points prealloc fail\n"); + return 2; + } + + hcf->points = points; + peer = &peers->peer[peers->number - 1]; + + ngx_http_upstream_consistent_hash(peer, points); + + ngx_qsort(points->point, + points->number, + sizeof(ngx_http_upstream_chash_point_t), + ngx_http_upstream_chash_cmp_points); + + for (i = 0, j = 1; j < points->number; j++) { + if (points->point[i].hash != points->point[j].hash) { + points->point[++i] = points->point[j]; + } + } + + points->number = i + 1; + + return 0; +} + + +static int +ngx_http_lua_upstream_remove_peer_chash(lua_State * L, + ngx_http_upstream_srv_conf_t *us) +{ + ngx_uint_t npoints, i, j; + ngx_http_upstream_rr_peer_t *peer; + ngx_http_upstream_rr_peers_t *peers; + ngx_http_upstream_chash_points_t *points; + ngx_http_upstream_hash_srv_conf_t *hcf; + size_t size; + + + hcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_hash_module); + if(hcf->points == NULL) { + return 0; + } + + ngx_pfree(ngx_cycle->pool, hcf->points); + + peers = us->peer.data; + npoints = peers->total_weight * 160; + + size = sizeof(ngx_http_upstream_chash_points_t) + + sizeof(ngx_http_upstream_chash_point_t) * (npoints - 1); + + points = ngx_palloc(ngx_cycle->pool, size); + if (points == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "points palloc fail\n"); + return 2; + } + + points->number = 0; + + for (i = 0; i < peers->number; i++) { + peer = &peers->peer[i]; + ngx_http_upstream_consistent_hash(peer, points); + + } + + ngx_qsort(points->point, + points->number, + sizeof(ngx_http_upstream_chash_point_t), + ngx_http_upstream_chash_cmp_points); + for (i = 0, j = 1; j < points->number; j++) { + if (points->point[i].hash != points->point[j].hash) { + points->point[++i] = points->point[j]; + } + } -static ngx_http_module_t ngx_http_lua_upstream_ctx = { - NULL, /* preconfiguration */ - ngx_http_lua_upstream_init, /* postconfiguration */ - NULL, /* create main configuration */ - NULL, /* init main configuration */ - NULL, /* create server configuration */ - NULL, /* merge server configuration */ - NULL, /* create location configuration */ - NULL /* merge location configuration */ -}; + points->number = i + 1; + hcf->points = points; + + return 0; +} -ngx_module_t ngx_http_lua_upstream_module = { - NGX_MODULE_V1, - &ngx_http_lua_upstream_ctx, /* module context */ - NULL, /* module directives */ - NGX_HTTP_MODULE, /* module type */ - NULL, /* init master */ - NULL, /* init module */ - NULL, /* init process */ - NULL, /* init thread */ - NULL, /* exit thread */ - NULL, /* exit process */ - NULL, /* exit master */ - NGX_MODULE_V1_PADDING -}; +#endif -static ngx_int_t -ngx_http_lua_upstream_init(ngx_conf_t *cf) +#if (NGX_HTTP_UPSTREAM_LEAST_CONN) + +static int +ngx_http_lua_upstream_least_conn_init(lua_State * L, + ngx_http_upstream_srv_conf_t *uscf, ngx_uint_t flag) { - if (ngx_http_lua_add_package_preload(cf, "ngx.upstream", - ngx_http_lua_upstream_create_module) - != NGX_OK) - { - return NGX_ERROR; + ngx_http_upstream_rr_peers_t *peers; + ngx_http_upstream_least_conn_conf_t *lcf; + ngx_uint_t *conns, n; + size_t old_size, new_size; + + + lcf = ngx_http_conf_upstream_srv_conf(uscf, + ngx_http_upstream_least_conn_module); + if(lcf->conns == NULL) { + return 0; } - return NGX_OK; + peers = uscf->peer.data; + n = peers->number; + n += peers->next ? peers->next->number : 0; + new_size = sizeof(ngx_uint_t) * n; + old_size = new_size - sizeof(ngx_uint_t); + + conns = ngx_prealloc(ngx_cycle->pool, lcf->conns, old_size, new_size, 0); + if (conns == NULL ) { + lua_pushnil(L); + lua_pushliteral(L, "conns prealloc fail\n"); + return 2; + } + + n = flag ? peers->number : peers->next->number; + conns[n - 1] = 0; + lcf->conns = conns; + + return 0; } static int -ngx_http_lua_upstream_create_module(lua_State * L) +ngx_http_lua_upstream_remover_peer_least_conn(lua_State * L, + ngx_http_upstream_srv_conf_t *us, ngx_uint_t pos) { - lua_createtable(L, 0, 1); + ngx_http_upstream_least_conn_conf_t *lcf; + ngx_uint_t *conns; + ngx_http_upstream_rr_peers_t *peers; + ngx_uint_t i, n; + size_t old_size, new_size; - lua_pushcfunction(L, ngx_http_lua_upstream_get_upstreams); - lua_setfield(L, -2, "get_upstreams"); - lua_pushcfunction(L, ngx_http_lua_upstream_get_servers); - lua_setfield(L, -2, "get_servers"); + peers = us->peer.data; - lua_pushcfunction(L, ngx_http_lua_upstream_get_primary_peers); - lua_setfield(L, -2, "get_primary_peers"); + lcf = ngx_http_conf_upstream_srv_conf(us, + ngx_http_upstream_least_conn_module); - lua_pushcfunction(L, ngx_http_lua_upstream_get_backup_peers); - lua_setfield(L, -2, "get_backup_peers"); + if(lcf->conns == NULL) { + return 0; + } - lua_pushcfunction(L, ngx_http_lua_upstream_set_peer_down); - lua_setfield(L, -2, "set_peer_down"); + for (i = pos; i < peers->number; i++) { + lcf->conns[i] = lcf->conns[i + 1]; + } + + n = peers->number; + n += peers->next ? peers->next->number : 0; + new_size = sizeof(ngx_uint_t) * n; + old_size = new_size + sizeof(ngx_uint_t); + + conns = ngx_prealloc(ngx_cycle->pool, lcf->conns, old_size, new_size, 0); + if (conns == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "conns realloc fail"); + return 2; + } + + lcf->conns = conns; + + return 0; +} + + +#endif + +/* + * The function is add a server to back-end peers + * it's suitable for ip_hash round_robin least_conn, + * hash the peer's weight ip port ... depends on + * nginx.conf. +*/ +static int +ngx_http_lua_upstream_add_peer(lua_State * L) +{ + ngx_uint_t n; + ngx_http_upstream_server_t *us; + ngx_http_upstream_srv_conf_t *uscf; + ngx_http_upstream_rr_peer_t peer; + ngx_http_upstream_rr_peers_t *peers; + ngx_http_upstream_rr_peers_t *backup; + ngx_http_request_t *r; + ngx_url_t u; + size_t old_size, new_size; +#if (NGX_HTTP_UPSTREAM_LEAST_CONN) + ngx_uint_t flag; + + flag = 0; +#endif + + if (lua_gettop(L) != 2) { + // two param is : "upstream" "ip:port" + // for lua code , you must pass this one param, is none ,you should + // consider pass default value. + lua_pushnil(L); + lua_pushliteral(L, "exactly two argument expected\n"); + return 2; + } + + r = ngx_http_lua_get_request(L); + if (r == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "get request error \n"); + return 2; + } + + ngx_memzero(&u, sizeof (ngx_url_t)); + u.url.data = (u_char *) luaL_checklstring(L, 2, &u.url.len); + u.default_port = 80; + +#if (NGX_DEBUG) + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "%s %s params: %s\n", __FILE__,__FUNCTION__ , u.url.data ); +#endif + + uscf = ngx_http_lua_upstream_check_peers(L, u, &us); + if ( uscf == NULL || us == NULL) { + return 2; + } + + peers = uscf->peer.data; + + ngx_memzero(&peer, sizeof (ngx_http_upstream_rr_peer_t)); + + if ( !us->backup ) { + if (ngx_http_lua_upstream_exist_peer(peers, u) == 1) { + lua_pushnil(L); + lua_pushliteral(L, "the peer is exist\n"); + return 2; + } + + n = peers != NULL ? (peers->number - 1) : 0; + old_size = n * sizeof(ngx_http_upstream_rr_peer_t) + + sizeof(ngx_http_upstream_rr_peers_t); + new_size = old_size + sizeof(ngx_http_upstream_rr_peer_t); + + peers = ngx_prealloc(ngx_cycle->pool, uscf->peer.data, old_size, new_size, 1); + if (peers == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "peers pcalloc fail\n"); + return 2; + } + + peer.weight = us->weight; + peer.effective_weight = us->weight; + peer.current_weight= 0; + peer.max_fails = us->max_fails; + peer.fail_timeout = us->fail_timeout; + peer.sockaddr = us->addrs->sockaddr; + peer.socklen = us->addrs->socklen; + peer.name = us->addrs->name; + peer.down = us->down; + peer.fails = 0; + peer.server = us->name; + + peers->peer[peers->number++] = peer; + peers->total_weight += peer.weight; + peers->single = (peers->number == 1); + peers->weighted = (peers->total_weight != peers->number); + +#if (NGX_HTTP_UPSTREAM_LEAST_CONN) + flag = 1; +#endif + + } else { + backup = peers->next; + if (ngx_http_lua_upstream_exist_peer(backup, u) == 1) { + lua_pushnil(L); + lua_pushliteral(L, "the backup peer is exist\n"); + return 2; + } + + n = backup != NULL ? (backup->number - 1) : 0; + + old_size = n * sizeof(ngx_http_upstream_rr_peer_t) + + sizeof(ngx_http_upstream_rr_peers_t); + new_size = sizeof(ngx_http_upstream_rr_peer_t) + old_size; + + backup = ngx_prealloc(ngx_cycle->pool, peers->next, old_size, new_size, 1); + if (backup == NULL ) { + lua_pushnil(L); + lua_pushliteral(L, "backup pcalloc fail\n"); + return 2; + } + + peers->single = 0; + backup->single = 0; + + peer.weight = us->weight; + peer.effective_weight = us->weight; + peer.current_weight= 0; + peer.max_fails = us->max_fails; + peer.fail_timeout = us->fail_timeout; + peer.server = us->name; + peer.sockaddr = us->addrs->sockaddr; + peer.socklen = us->addrs->socklen; + peer.name = us->addrs->name; + peer.down = us->down; + peer.fails = 0; + + backup->peer[backup->number++] = peer; + backup->total_weight += peer.weight; + backup->single = (backup->number == 1); + backup->weighted = (backup->total_weight != backup->number); + + peers->next = backup; + } + + uscf->peer.data = peers; + +#if (NGX_HTTP_UPSTREAM_LEAST_CONN) + if(ngx_http_lua_upstream_least_conn_init(L, uscf, flag)) { + return 2; + } +#endif + +#if (NGX_HTTP_UPSTREAM_CONSISTENT_HASH) + if(ngx_http_lua_upstream_consistent_hash_init(L, uscf)) { + return 2; + } +#endif + + lua_pushboolean(L, 1); + return 1; +} + +/* + * The function is remove server from back-end peers. if + * the server is not find and return error and notes the + * server is not find. now suitable for round_robin or + * ip_hash least_conn hash. +*/ +static int +ngx_http_lua_upstream_remove_peer(lua_State * L) +{ + ngx_uint_t i, j, n; + ngx_uint_t flag; + size_t len; + ngx_str_t host; + ngx_http_upstream_rr_peers_t *peers; + ngx_http_upstream_rr_peers_t *backup; + ngx_http_upstream_rr_peers_t *primary_peers; + ngx_http_upstream_srv_conf_t *uscf; + ngx_http_request_t *r; + ngx_url_t u; + size_t old_size, new_size; + + if (lua_gettop(L) != 2) { + // two param is : "bar","ip:port" + // for lua code , you must pass this two param, is none ,you should + // consider pass default value. + lua_pushnil(L); + lua_pushliteral(L, "exactly two argument expected\n"); + return 2; + } + + r = ngx_http_lua_get_request(L); + if (r == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "get request error \n"); + return 2; + } + + host.data = (u_char *) luaL_checklstring(L, 1, &host.len); + + ngx_memzero(&u, sizeof (ngx_url_t)); + u.url.data = (u_char *) luaL_checklstring(L, 2, &u.url.len); + u.default_port = 80; + +#if (NGX_DEBUG) + ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "%s %s params: %s\n", __FILE__,__FUNCTION__ , u.url.data ); +#endif + + uscf = ngx_http_lua_upstream_find_upstream(L, &host); + if (uscf == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "upstream not found\n"); + return 2; + } + + peers = uscf->peer.data; + if (peers == NULL ) { + lua_pushnil(L); + lua_pushliteral(L, "peers is null\n"); + return 2; + } + + backup = peers ? peers->next : NULL; + flag = 0; + if (!ngx_http_lua_upstream_exist_peer(peers, u) + && !(flag = ngx_http_lua_upstream_exist_peer(backup, u))) { + lua_pushnil(L); + lua_pushliteral(L, "not found this peer\n"); + return 2; + } + + primary_peers = (flag == 1 ? backup : peers); + if (primary_peers == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "primary_peers is null\n"); + return 2; + } + + if (primary_peers->number == 1) { + lua_pushnil(L); + lua_pushliteral(L, "upstream last one is not allowed to delete\n"); + return 2; + } + + for (i = 0; (primary_peers->peer != NULL) && (i < primary_peers->number); i++) { + + len = primary_peers->peer[i].name.len; + if (len == u.url.len + && ngx_memcmp(u.url.data, primary_peers->peer[i].name.data, u.url.len) == 0) { + + for (j = i; j < primary_peers->number - 1; j++) { + primary_peers->peer[j] = primary_peers->peer[j + 1]; + } + + n = primary_peers->number - 1; + + old_size = n * sizeof(ngx_http_upstream_rr_peer_t) + + sizeof(ngx_http_upstream_rr_peers_t); + new_size = old_size - sizeof(ngx_http_upstream_rr_peer_t); + if(!flag) { + peers = ngx_prealloc(ngx_cycle->pool, peers, old_size, new_size, 0); + peers->number -= 1; + peers->single = (peers->number == 1); + + } else { + backup = ngx_prealloc(ngx_cycle->pool, backup, old_size, new_size, 0); + backup->number -= 1; + peers->next = backup; + peers->single = (peers->number == 1); + } + + uscf->peer.data = peers; + + break; + } + } + +#if (NGX_HTTP_UPSTREAM_LEAST_CONN) + if (ngx_http_lua_upstream_remover_peer_least_conn(L, uscf, i)) { + return 2; + } + +#endif + +#if (NGX_HTTP_UPSTREAM_CONSISTENT_HASH) + if (ngx_http_lua_upstream_remove_peer_chash(L, uscf)) { + return 2; + } + +#endif + + lua_pushboolean(L, 1); return 1; } +/* + * The function is check upstream whether there is + * such a u.url.data,if exist return srv_conf_t structure + * else return NULL. +*/ +static ngx_http_upstream_srv_conf_t * +ngx_http_lua_upstream_check_peers(lua_State * L, ngx_url_t u, + ngx_http_upstream_server_t ** srv) +{ + ngx_http_upstream_srv_conf_t *uscf; + ngx_str_t host; + + if (lua_gettop(L) != 2) { + lua_pushnil(L); + lua_pushliteral(L, "no argument expected\n"); + return NULL; + } + + ngx_memzero(&host, sizeof(ngx_str_t)); + host.data = (u_char *) luaL_checklstring(L, 1, &host.len); + + uscf = ngx_http_lua_upstream_find_upstream(L, &host); + if (uscf == NULL) { + lua_pushnil(L); + lua_pushliteral(L, "upstream not found\n"); + return NULL; + } + + *srv = ngx_http_lua_upstream_compare_server(uscf, u); + if (*srv == NULL) { + lua_pushnil(L); + lua_pushliteral(L,"not find this peer\n"); + return NULL; + } + + return uscf; +} + + +/* + * The function is check current peers whether exists + * a peer such a u.url.data if exists and return 1, + * else return 0. +*/ +static int +ngx_http_lua_upstream_exist_peer(ngx_http_upstream_rr_peers_t * peers, ngx_url_t u) +{ + ngx_uint_t i; + size_t len; + ngx_http_upstream_rr_peer_t peer; + + for (i = 0; (peers != NULL) && (i < peers->number); i++) { + peer = peers->peer[i]; + + len = peer.name.len; + if (len == u.url.len + && ngx_memcmp(u.url.data, peer.name.data, u.url.len) == 0) { + return 1; + } + } + + return 0; +} + + +static ngx_int_t +ngx_pfree_and_delay(ngx_pool_t *pool, void *p) +{ + ngx_pool_large_t *l; + + for (l = pool->large; l; l = l->next) { + if (p == l->alloc) { + ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, pool->log, 0, + "delay free: %p", l->alloc); + + ngx_http_lua_upstream_event_init(l->alloc); + + return NGX_OK; + } + } + + return NGX_DECLINED; +} + + +/* + * The function copy from tengine-2.1.0 core/ngx_palloc.c. + * +*/ +static void * +ngx_prealloc(ngx_pool_t *pool, void *p, size_t old_size, size_t new_size, ngx_uint_t flag) +{ + void *new; + ngx_pool_t *node; + + if (p == NULL) { + return ngx_palloc(pool, new_size); + } + + if (new_size == 0) { + if ((u_char *) p + old_size == pool->d.last) { + pool->d.last = p; + + } else { + ngx_pfree(pool, p); + } + + return NULL; + } + + if (old_size <= pool->max) { + for (node = pool; node; node = node->d.next) { + if ((u_char *)p + old_size == node->d.last + && (u_char *)p + new_size <= node->d.end) { + node->d.last = (u_char *)p + new_size; + return p; + } + } + } + + if (new_size <= old_size) { + return p; + } + + new = ngx_palloc(pool, new_size); + if (new == NULL) { + return NULL; + } + + ngx_memcpy(new, p, old_size); + + if (flag) { + ngx_pfree_and_delay(pool, p); + + } else { + ngx_pfree(pool, p); + } + + return new; +} + + static int ngx_http_lua_upstream_get_upstreams(lua_State * L) { @@ -385,6 +1655,7 @@ ngx_http_lua_upstream_lookup_peer(lua_State *L) } + static int ngx_http_lua_get_peer(lua_State *L, ngx_http_upstream_rr_peer_t *peer, ngx_uint_t id) @@ -460,7 +1731,6 @@ ngx_http_lua_get_peer(lua_State *L, ngx_http_upstream_rr_peer_t *peer, return 0; } - static ngx_http_upstream_main_conf_t * ngx_http_lua_upstream_get_upstream_main_conf(lua_State *L) { @@ -495,8 +1765,7 @@ ngx_http_lua_upstream_find_upstream(lua_State *L, ngx_str_t *host) uscf = uscfp[i]; if (uscf->host.len == host->len - && ngx_memcmp(uscf->host.data, host->data, host->len) == 0) - { + && ngx_memcmp(uscf->host.data, host->data, host->len) == 0) { return uscf; } } @@ -520,8 +1789,7 @@ ngx_http_lua_upstream_find_upstream(lua_State *L, ngx_str_t *host) if (uscf->port && uscf->port == n && uscf->host.len == len - && ngx_memcmp(uscf->host.data, host->data, len) == 0) - { + && ngx_memcmp(uscf->host.data, host->data, len) == 0) { return uscf; } } @@ -529,3 +1797,4 @@ ngx_http_lua_upstream_find_upstream(lua_State *L, ngx_str_t *host) return NULL; } + diff --git a/src/ngx_http_lua_upstream_module.h b/src/ngx_http_lua_upstream_module.h new file mode 100644 index 0000000..0aec1a0 --- /dev/null +++ b/src/ngx_http_lua_upstream_module.h @@ -0,0 +1,49 @@ +#ifndef NGX_LUA_HTTP_MODULE +#define NGX_LUA_HTTP_MODULE + +// the NGX_HTTP_UPSTREAM_LEAST_CONN define is support least_conn. +#define NGX_HTTP_UPSTREAM_LEAST_CONN 0 +// the NGX_HTTP_UPSTREAM_LEAST_CONN define is support hash. +#define NGX_HTTP_UPSTREAM_CONSISTENT_HASH 0 + + +#if (NGX_HTTP_UPSTREAM_LEAST_CONN) + +//Mark the variable from ngx_http_upstream_least_conn_module.c file +extern ngx_module_t ngx_http_upstream_least_conn_module; + + +//Mark the struct from ngx_http_upstream_least_conn_module.c file +typedef struct { + ngx_uint_t *conns; +} ngx_http_upstream_least_conn_conf_t; + +#endif + + +#if (NGX_HTTP_UPSTREAM_CONSISTENT_HASH) + +extern ngx_module_t ngx_http_upstream_hash_module; + + +typedef struct { + uint32_t hash; + ngx_str_t *server; +} ngx_http_upstream_chash_point_t; + + +typedef struct { + ngx_uint_t number; + ngx_http_upstream_chash_point_t point[1]; +} ngx_http_upstream_chash_points_t; + + +typedef struct { + ngx_http_complex_value_t key; + ngx_http_upstream_chash_points_t *points; +} ngx_http_upstream_hash_srv_conf_t; + + +#endif + +#endif diff --git a/t/check_update_server.t b/t/check_update_server.t new file mode 100644 index 0000000..79ed8c5 --- /dev/null +++ b/t/check_update_server.t @@ -0,0 +1,220 @@ +# vim:set ft= ts=4 sw=4 et fdm=marker: + +use Test::Nginx::Socket::Lua; + +#worker_connections(1014); +#master_on(); +#workers(2); +#log_level('warn'); + +repeat_each(1); + +plan tests => repeat_each() * (blocks() * 3); + +$ENV{TEST_NGINX_MEMCACHED_PORT} ||= 11211; + +$ENV{TEST_NGINX_MY_INIT_CONFIG} = <<_EOC_; +lua_package_path "t/lib/?.lua;;"; +_EOC_ + +#no_diff(); +no_long_string(); +run_tests(); + +__DATA__ + +=== TEST 1: add server with upstream +--- http_config + upstream foo.com { + server agentzh.org:81; + } + + upstream bar { + server 127.0.0.1:81; + } +--- config + location /add_server { + default_type text/plain; + content_by_lua ' + local concat = table.concat + local upstream = require "ngx.upstream" + local get_servers = upstream.get_servers + local get_upstreams = upstream.get_upstreams + local add_server = upstream.add_server + + local args = ngx.req.get_uri_args() + local upstream_name + + upstream_name = args["upstream"] + local server_ip = args["ip"] + local server_port = args["port"] + local weight = 2 + local max_fails = 10 + local fail_timeout = 10 + local is_backup = false + + local str,err = add_server("bar",server_ip..":"..server_port,weight,max_fails,fail_timeout,is_backup) + if not str then + ngx.say("the server is exist :",server_ip..":"..server_port) + end + + ngx.print("----------------------------\\n") + + local srvs, err = get_servers(upstream_name) + if not srvs then + ngx.say("failed to get servers: ",err) + else + for _, srv in ipairs(srvs) do + local first = true + for k, v in pairs(srv) do + if first then + first = false + ngx.print(" ") + else + ngx.print(", ") + end + if type(v) == "table" then + ngx.print(k, " = {", concat(v, ", "), "}") + else + ngx.print(k, " = ", v) + end + end + ngx.print("\\n") + end + end + '; + } + +--- request + GET /add_server?upstream=bar&ip=127.0.0.1&port=80 +--- response_body + +---------------------------- + addr = 127.0.0.1:81, weight = 1, fail_timeout = 10, max_fails = 1 + addr = 127.0.0.1:80, weight = 2, fail_timeout = 10, max_fails = 10 +--- no_error_log +[error] + + + +=== TEST 2: add peer with upstream'peers +--- http_config + upstream foo.com { + server 127.0.0.1 fail_timeout=53 weight=4 max_fails=100; + server agentzh.org:81; + } + + upstream bar { + server 127.0.0.1:81 weight=1; + } +--- config + location /add_peer { + default_type text/plain; + content_by_lua ' + local concat = table.concat + local upstream = require "ngx.upstream" + local add_peer = upstream.add_peer + + local args = ngx.req.get_uri_args() + local upstream_name + + upstream_name = args["upstream"] + local server_ip = args["ip"] + local server_port = args["port"] + + ngx.say("server ", ":", server_ip..":"..server_port) + + local str,err = add_peer(upstream_name,server_ip..":"..server_port) + if not str then + ngx.say(err) + end + ngx.print("\\n----------------------------\\n") + '; + } + +--- request + GET /add_peer?upstream=bar&ip=127.0.0.1&port=80 +--- response_body +server :127.0.0.1:80 +not find this peer + + +---------------------------- +--- no_error_log +[error] + + + +=== TEST 3: remove server from upstream +--- http_config + upstream foo.com { + server 127.0.0.1 fail_timeout=53 weight=4 max_fails=100; + server agentzh.org:81; + } + + upstream bar { + server 127.0.0.1:81 weight=1; + } +--- config + location /remove_server { + content_by_lua ' + local upstream = require "ngx.upstream" + local remove_server = upstream.remove_server + local args = ngx.req.get_uri_args() + upstream_name = args["upstream"] + local server_ip = args["ip"] + local server_port = args["port"] + + local ser, err = remove_server(upstream_name,server_ip..":"..server_port) + if not ser then + ngx.say("failed to remove server: ", err) + return + end + '; + } + +--- request + GET /remove_server?upstream=bar&ip=127.0.0.1&port=81 +--- response_body +--- no_error_log +[error] + + + +=== TEST 4: remove peer from upstream + +--- http_config + + upstream foo.com { + server 127.0.0.1 fail_timeout=53 weight=4 max_fails=100; + server agentzh.org:81; + } + + upstream bar { + server 127.0.0.1:81 weight=1; + } +--- config + location /remove_peer { + content_by_lua ' + local upstream = require "ngx.upstream" + local remove_peer = upstream.remove_peer + local args = ngx.req.get_uri_args() + upstream_name = args["upstream"] + local server_ip = args["ip"] + local server_port = args["port"] + + local ser, err = remove_peer(upstream_name,server_ip..":"..server_port) + if not ser then + ngx.say("failed to remove peer: ", err) + return + end + '; + } + +--- request +--- request + GET /remove_peer?upstream=bar&ip=127.0.0.1&port=81 +--- response_body +--- no_error_log +[error] + diff --git a/t/lib/dyupsc.lua b/t/lib/dyupsc.lua new file mode 100644 index 0000000..fe11e71 --- /dev/null +++ b/t/lib/dyupsc.lua @@ -0,0 +1,271 @@ +local log = ngx.log +local ERR = ngx.ERR +local INFO = ngx.INFO +local WARN = ngx.WARN +local DEBUG = ngx.DEBUG +local str_find = string.find +local sub = string.sub +local str_len = string.len +local gsub = string.gsub +local table_sort = table.sort +local new_timer = ngx.timer.at +local shared = ngx.shared +local debug_mode = ngx.config.debug +local worker_pid = ngx.worker.pid() +local tonumber = tonumber +local pairs = pairs + + +local _M = { + _VERSION = '0.01' +} + + +local ok, upstream = pcall(require, "ngx.upstream") +if not ok then + error("ngx_upstream_lua module required") +end + + +local set_peer_down = upstream.set_peer_down +local add_server = upstream.add_server +local add_peer = upstream.add_peer +local remove_server = upstream.remove_server +local remove_peer = upstream.remove_peer +local get_primary_peers = upstream.get_primary_peers +local get_backup_peers = upstream.get_backup_peers +local get_upstreams = upstream.get_upstreams + + +local function addserver(upstream, ipport) + local ok,err = add_server(upstream, ipport, server_info.weight, server_info.max_fails, server_info.fail_timeout) + return ok, err +end + + +-- global var is save some server general information +-- forexample weight max_fails fail_timeout +server_info = { } +server_info.weight = 2 +server_info.max_fails = 10 +server_info.fail_timeout = 10 + +-- action save some function addrs that make it dynamicly running +local action = { } +action.add_server = addserver +action.add_peer = add_peer +action.remove_server = remove_server +action.remove_peer = remove_peer + + +local function info(...) + log(INFO, "dyupsc: ", ...) +end + + +local function warn(...) + log(WARN, "dyupsc: ", ...) +end + + +local function errlog(...) + log(ERR, "dyupsc: ", ...) +end + + +local function debug(...) + if debug_mode then + log(DEBUG, "dyupsc: ", ...) + end +end + + +local function trim (s) + return gsub(s, "^%s*(.-)%s*$", "%1") +end + + +local function split(buf, sep) + local findstartindex = 1 + local splitindex = 1 + local splitarray = { } + while true do + local findlastindex = str_find(buf, sep, findstartindex) + if not findlastindex then + splitarray[splitindex] = sub(buf, findstartindex, str_len(buf)) + break + end + splitarray[splitindex] = sub(buf, findstartindex, findlastindex - 1) + findstartindex = findlastindex + str_len(sep) + splitindex = splitindex + 1 + end + return splitarray +end + + +local function run_lock(cmdkey, ctx) + local dict = ctx.dict + local cmd = split(cmdkey, ":") + local key = worker_pid..":"..cmdkey + if tonumber(cmd[1]) then + key = cmdkey + end + + local ok, err = dict:add(key, true, ctx.interval*2) + if not ok then + if err == "exists" then + return nil + end + errlog("failed to add key \"", key, "\": ", err) + return nil + end + return true +end + + +local function dec_sort(table) + table_sort(table,function(a,b) return a>b end) +end + + +local function do_run(ctx) + local dict = ctx.dict + local worker_process = ctx.worker_process + local listkeys = dict:get_keys(0) + if #listkeys <= 0 then + info("the cmd list keys is null") + return + end + + local key = "version" + local ver, err = dict:get(key) + if not ver then + errlog("failed to get cmd version that not \"version\" key") + return + end + + if err then + errlog("failed to get cmd version: ", err) + return + end + -- make listkeys sort and make add_server before add_peer running + -- because add_peer will judge whether the server + dec_sort(listkeys) + + for i, v in pairs(listkeys) do + local cmd = split(v,":") + if (v ~= "version") and run_lock(v, ctx) and (ctx.version <= ver) then + if err then + errlog("get listkes value error: ", err) + return + end + + local num, err = dict:get(v) + if not num then + errlog("do not get cmd value") + return + end + + if err then + errlog("fail get cmd value",err) + return + end + + if num >= worker_process then + dict:delete(v) + + else + dict:incr(v, 1) + local operation = trim(cmd[1]) + local upstream = trim(cmd[2]) + local ip = trim(cmd[3]) + local port = trim(cmd[4]) + + if not operation or not upstream or not ip or not port then + errlog("may be operation or upstream or ip or port is nil") + return + end + + local ok, err = action[operation](upstream, ip..":"..port) + if not ok then + errlog(operation," is fail error info: ",err) + return + end + + end + end + end + ctx.version = ver +end + + +local check_pull +check_pull = function (premature, ctx) + if premature then + return + end + + local ok, err = pcall(do_run, ctx) + if not ok then + errlog("failed to run dyupsc cycle: ", err) + end + + local ok, err = new_timer(ctx.interval, check_pull, ctx) + if not ok then + if err ~= "process exiting" then + errlog("failed to create timer: ", err) + end + return + end +end + + +function _M.dyups_checker(opts) + local interval = opts.interval + if not interval then + interval = 5 + + else + interval = interval / 1000 + if interval < 0.002 then -- minimum 2ms + interval = 0.002 + end + end + + local worker_process = opts.worker_process + if not worker_process then + worker_process = 2 + end + + -- debug("interval: ", interval) + local shm = opts.shm + if not shm then + return nil, "\"shm\" option required" + end + + local dict = shared[shm] + if not dict then + return nil, "shm \"" .. tostring(shm) .. "\" not found" + end + + server_info.weight = opts.weight + server_info.max_fails = opts.max_fails + server_info.fail_timeout = opts.fail_timeout + + local ctx = { + interval = interval, + dict = dict, + version = 0, + worker_process = worker_process, + } + + local ok, err = new_timer(0, check_pull, ctx) + if not ok then + return nil, "failed to create timer: " .. err + end + + return true +end + + +return _M diff --git a/t/sanity.t b/t/sanity.t index dd17159..82f5490 100644 --- a/t/sanity.t +++ b/t/sanity.t @@ -564,3 +564,4 @@ upstream 127.0.0.1:1130: --- no_error_log [error] +