diff --git a/memcached-1.4.15/assoc.c b/memcached-1.4.15/assoc.c index 579db4e..69a2b3a 100644 --- a/memcached-1.4.15/assoc.c +++ b/memcached-1.4.15/assoc.c @@ -33,11 +33,13 @@ typedef unsigned char ub1; /* unsigned 1-byte quantities */ /* how many powers of 2's worth of buckets we use */ unsigned int hashpower = HASHPOWER_DEFAULT; - +//hashsize(n)为2的幂 #define hashsize(n) ((ub4)1<<(n)) +//哈希掩码,hashmask的值的二进制形式就是后面全为1的数 #define hashmask(n) (hashsize(n)-1) /* Main hash table. This is where we look except during expansion. */ +//哈希表指针 static item** primary_hashtable = 0; /* @@ -58,11 +60,12 @@ static bool started_expanding = false; * far we've gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1. */ static unsigned int expand_bucket = 0; - +//本函数由main函数调用 void assoc_init(const int hashtable_init) { if (hashtable_init) { hashpower = hashtable_init; } + //分配空间,hashsize就是哈希表长度 primary_hashtable = calloc(hashsize(hashpower), sizeof(void *)); if (! primary_hashtable) { fprintf(stderr, "Failed to init hashtable.\n"); @@ -73,7 +76,11 @@ void assoc_init(const int hashtable_init) { stats.hash_bytes = hashsize(hashpower) * sizeof(void *); STATS_UNLOCK(); } - +//由于哈希值只能确定是在哈希表中的哪个桶(bucket),但一个桶里面是有一条冲突链的 +//此时需要用到具体的键值遍历并一一比较冲突链上的所有节点。虽然key是以'\0'结尾 +//的字符串,但调用strlen还是有点耗时(需要遍历键值字符串)。所以需要另外一个参数 +//nkey指明这个key的长度 +//reference:http://blog.csdn.net/luotuo44/article/details/42773231 item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) { item *it; unsigned int oldbucket; @@ -84,13 +91,15 @@ item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) { { it = old_hashtable[oldbucket]; } else { + //由哈希值判断这个key是属于那个桶(bucket)的 it = primary_hashtable[hv & hashmask(hashpower)]; } - // 在桶里搜索目标 + // 在桶里遍历搜索目标 item *ret = NULL; int depth = 0; while (it) { + //调用memcmp来进行比较 if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) { ret = it; break; @@ -104,7 +113,7 @@ item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) { /* returns the address of the item pointer before the key. if *item == 0, the item wasn't found */ - +//查找item,返回前驱节点的h_next成员地址 static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) { item **pos; unsigned int oldbucket; @@ -116,7 +125,7 @@ static item** _hashitem_before (const char *key, const size_t nkey, const uint32 } else { pos = &primary_hashtable[hv & hashmask(hashpower)]; } - + //遍历桶的冲突链查找item while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) { pos = &(*pos)->h_next; } @@ -126,7 +135,7 @@ static item** _hashitem_before (const char *key, const size_t nkey, const uint32 /* grows the hashtable to the next power of 2. */ static void assoc_expand(void) { old_hashtable = primary_hashtable; - + //申请一个新的hash表,此时primary_hashtable是新表,old_hashtable是旧表 primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *)); if (primary_hashtable) { if (settings.verbose > 1) @@ -144,15 +153,17 @@ static void assoc_expand(void) { /* Bad news, but we can keep running. */ } } - +//assoc_insert函数会调用本函数,当item数量到了哈希表长度的1.5倍时,在该函数中会唤醒扩容线程 static void assoc_start_expand(void) { if (started_expanding) return; started_expanding = true; + //phtread_cond_signal来唤醒等待该maintanance_cond条件变量的线程,即扩容线程 pthread_cond_signal(&maintenance_cond); } /* Note: this isn't an assoc_update. The key must not already exist to call this */ +//hash,插入元素,hv是这个item键值的哈希值 int assoc_insert(item *it, const uint32_t hv) { unsigned int oldbucket; @@ -162,28 +173,31 @@ int assoc_insert(item *it, const uint32_t hv) { if (expanding && (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) { + //目前处于扩容状态,但这个桶的数据还没有迁移,因此插入到旧表old_hashtable it->h_next = old_hashtable[oldbucket]; old_hashtable[oldbucket] = it; } else { + //插入到新表,primary_hashtable it->h_next = primary_hashtable[hv & hashmask(hashpower)]; primary_hashtable[hv & hashmask(hashpower)] = it; } - + //元素数目+1 hash_items++; - // 适时扩张 + // 适时扩张,当元素个数超过hash容量的1.5倍时 if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) { + //调用assoc_start_expand函数来唤醒扩容线程 assoc_start_expand(); } MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items); return 1; } - +//从链表中删除一个节点的常规做法是:先找到这个节点的前驱节点,然后使用前驱节点的next指针进行删除和拼接操作。 void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) { // 寻找到需要删除节点的前一个节点, 这是链表删除的经典操作 item **before = _hashitem_before(key, nkey, hv); - + //查找成功 if (*before) { item *nxt; hash_items--; @@ -201,14 +215,20 @@ void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) { assert(*before != 0); } - +//全局flag,用来判断是否终止扩容线程 static volatile int do_run_maintenance_thread = 1; +//该变量用来定义部分数据迁移时的桶大小,以避免数据迁移时过久持有锁,在assoc_maintenance_thread中被使用 +//具体用法查看:http://blog.csdn.net/luotuo44/article/details/42773231 #define DEFAULT_HASH_BULK_MOVE 1 int hash_bulk_move = DEFAULT_HASH_BULK_MOVE; -static void *assoc_maintenance_thread(void *arg) { +//启动扩容线程,扩容线程在main函数中会启动,启动运行一遍之后会阻塞在条件变量maintenance_cond上面, +//当插入元素超过规定,唤醒条件变量,再次运行 +static void *assoc_maintenance_thread(void *arg) { + //do_run_maintenance_thread是全局变量,初始值为1 + //在stop_assoc_maintenance_thread函数中会被赋值0,用来终止迁移线程 while (do_run_maintenance_thread) { int ii = 0; @@ -216,11 +236,13 @@ static void *assoc_maintenance_thread(void *arg) { * hash table. */ item_lock_global(); mutex_lock(&cache_lock); - + + //hash_bulk_move用来控制每次迁移,移动多少个桶的item。默认是一个. + //如果expanding为true才会进入循环体,所以迁移线程刚创建的时候,并不会进入循环体 for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { item *it, *next; int bucket; - + //遍历旧哈希表中由expand_bucket指明的桶,将该桶的所有item迁移到新扩容的哈希表中 for (it = old_hashtable[expand_bucket]; NULL != it; it = next) { next = it->h_next; @@ -230,8 +252,9 @@ static void *assoc_maintenance_thread(void *arg) { } old_hashtable[expand_bucket] = NULL; - + //迁移完一个桶,接着把expand_bucket指向下一个待迁移的桶 expand_bucket++; + //数据迁移完毕 if (expand_bucket == hashsize(hashpower - 1)) { expanding = false; free(old_hashtable); @@ -243,10 +266,10 @@ static void *assoc_maintenance_thread(void *arg) { fprintf(stderr, "Hash table expansion done\n"); } } - + //释放锁 mutex_unlock(&cache_lock); item_unlock_global(); - + //不再迁移数据 if (!expanding) { /* finished expanding. tell all threads to use fine-grained locks */ switch_item_lock_type(ITEM_LOCK_GRANULAR); @@ -254,13 +277,16 @@ static void *assoc_maintenance_thread(void *arg) { /* We are done expanding.. just wait for next invocation */ mutex_lock(&cache_lock); started_expanding = false; + //挂起迁移线程,直到worker线程插入数据后发现item数量已经到了1.5倍哈希表大小, + //此时调用worker线程调用assoc_start_expand函数,该函数会调用pthread_cond_signal + //唤醒迁移线程 pthread_cond_wait(&maintenance_cond, &cache_lock); /* Before doing anything, tell threads to use a global lock */ mutex_unlock(&cache_lock); slabs_rebalancer_pause(); switch_item_lock_type(ITEM_LOCK_GLOBAL); mutex_lock(&cache_lock); - assoc_expand(); + assoc_expand();//??? mutex_unlock(&cache_lock); } } @@ -269,6 +295,7 @@ static void *assoc_maintenance_thread(void *arg) { static pthread_t maintenance_tid; +//main函数会调用本函数,启动数据迁移线程 int start_assoc_maintenance_thread() { int ret; char *env = getenv("MEMCACHED_HASH_BULK_MOVE"); diff --git a/memcached-1.4.15/cache.c b/memcached-1.4.15/cache.c index 07a2ae0..689f56b 100644 --- a/memcached-1.4.15/cache.c +++ b/memcached-1.4.15/cache.c @@ -16,6 +16,7 @@ int cache_error = 0; const int initial_pool_size = 64; +//创建缓存对象 cache_t* cache_create(const char *name, size_t bufsize, size_t align, cache_constructor_t* constructor, cache_destructor_t* destructor) { @@ -54,6 +55,7 @@ static inline void* get_object(void *ptr) { #endif } +//销毁缓存对象 void cache_destroy(cache_t *cache) { while (cache->freecurr > 0) { void *ptr = cache->ptr[--cache->freecurr]; diff --git a/memcached-1.4.15/items.c b/memcached-1.4.15/items.c index 1a1952a..6ac4f8a 100644 --- a/memcached-1.4.15/items.c +++ b/memcached-1.4.15/items.c @@ -1,4 +1,5 @@ /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +//部分注释参考:http://blog.csdn.net/luotuo44/article/details/42869325 #include "memcached.h" #include #include @@ -36,9 +37,13 @@ typedef struct { uint64_t evicted_unfetched; } itemstats_t; +//以下数组用来管理LRU队列,可以参考:http://blog.csdn.net/luotuo44/article/details/42869325 +//指向每一个LRU队列头 static item *heads[LARGEST_ID]; +//指向每一个LRU队列尾 static item *tails[LARGEST_ID]; static itemstats_t itemstats[LARGEST_ID]; +//每一个LRU队列有多少个item static unsigned int sizes[LARGEST_ID]; void item_stats_reset(void) { @@ -49,6 +54,7 @@ void item_stats_reset(void) { /* Get the next CAS id for a new item. */ +//为新的item生成cas值 uint64_t get_cas_id(void) { static uint64_t cas_id = 0; return ++cas_id; @@ -77,6 +83,7 @@ uint64_t get_cas_id(void) { * * Returns the total size of the header. */ + //计算一个Item的长度,根据这个来定位slabclass的id static size_t item_make_header(const uint8_t nkey, const int flags, const int nbytes, char *suffix, uint8_t *nsuffix) { /* suffix is defined at 40 chars elsewhere.. */ @@ -91,17 +98,19 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, uint8_t nsuffix; item *it = NULL; char suffix[40]; + //计算这个item的空间 size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix); if (settings.use_cas) { ntotal += sizeof(uint64_t); } - + //根据大小判断从属于哪个slab unsigned int id = slabs_clsid(ntotal); if (id == 0) return 0; mutex_lock(&cache_lock); /* do a quick check if we have any expired items in the tail.. */ + //在LRU中尝试5次还没合适的空间,则执行申请空间的操作 int tries = 5; int tried_alloc = 0; item *search; @@ -136,6 +145,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, } /* Expired or flushed */ + // search指向的item过期了,则直接复用这块内存 if ((search->exptime != 0 && search->exptime < current_time) || (search->time <= oldest_live && oldest_live <= current_time)) { itemstats[id].reclaimed++; @@ -147,7 +157,10 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, do_item_unlink_nolock(it, hv); /* Initialize the item block: */ it->slabs_clsid = 0; - } else if ((it = slabs_alloc(ntotal, id)) == NULL) { + } + //此刻,过期失效的item没有找到,申请内存又失败了。看来只能使用 + //LRU淘汰一个item(即使这个item并没有过期失效) + else if ((it = slabs_alloc(ntotal, id)) == NULL) { tried_alloc = 1; if (settings.evict_to_free == 0) { itemstats[id].outofmemory++; @@ -183,7 +196,7 @@ item *do_item_alloc(char *key, const size_t nkey, const int flags, item_trylock_unlock(hold_lock); break; } - + //从slab分配器中申请内存 if (!tried_alloc && (tries == 0 || search == NULL)) it = slabs_alloc(ntotal, id); @@ -248,7 +261,7 @@ bool item_size_ok(const size_t nkey, const int flags, const int nbytes) { return slabs_clsid(ntotal) != 0; } - +//将item插入到LRU队列的头部 static void item_link_q(item *it) { /* item is the new head */ item **head, **tail; assert(it->slabs_clsid < LARGEST_ID); @@ -258,42 +271,47 @@ static void item_link_q(item *it) { /* item is the new head */ tail = &tails[it->slabs_clsid]; assert(it != *head); assert((*head && *tail) || (*head == 0 && *tail == 0)); + //头插法插入该item it->prev = 0; it->next = *head; if (it->next) it->next->prev = it; *head = it; if (*tail == 0) *tail = it; + //个数加1 sizes[it->slabs_clsid]++; return; } - +//将it从对应的LRU队列中删除 static void item_unlink_q(item *it) { item **head, **tail; assert(it->slabs_clsid < LARGEST_ID); head = &heads[it->slabs_clsid]; tail = &tails[it->slabs_clsid]; - + //首节点 if (*head == it) { assert(it->prev == 0); *head = it->next; } + //尾节点 if (*tail == it) { assert(it->next == 0); *tail = it->prev; } assert(it->next != it); assert(it->prev != it); - + //前后节点链接 if (it->next) it->next->prev = it->prev; if (it->prev) it->prev->next = it->next; sizes[it->slabs_clsid]--; return; } - +//将item插入到哈希表和LRU队列中,hv为哈希值 int do_item_link(item *it, const uint32_t hv) { MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes); + //确保这个item已经从slab分配出去并且还没插入到LRU队列中 assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0); mutex_lock(&cache_lock); + //加入link标记 it->it_flags |= ITEM_LINKED; it->time = current_time; @@ -305,14 +323,17 @@ int do_item_link(item *it, const uint32_t hv) { /* Allocate a new CAS ID on link. */ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0); + //插入到hash表中 assoc_insert(it, hv); + //item插入到链表中 item_link_q(it); + //引用计数加1 refcount_incr(&it->refcount); mutex_unlock(&cache_lock); return 1; } - +//从哈希表和LRU中删除 void do_item_unlink(item *it, const uint32_t hv) { MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes); mutex_lock(&cache_lock); @@ -322,8 +343,11 @@ void do_item_unlink(item *it, const uint32_t hv) { stats.curr_bytes -= ITEM_ntotal(it); stats.curr_items -= 1; STATS_UNLOCK(); + //从哈希表中删除 assoc_delete(ITEM_key(it), it->nkey, hv); + //从链表中删除 item_unlink_q(it); + //向slab归还这个item do_item_remove(it); } mutex_unlock(&cache_lock); @@ -343,25 +367,29 @@ void do_item_unlink_nolock(item *it, const uint32_t hv) { do_item_remove(it); } } - +//向slab归还item void do_item_remove(item *it) { MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes); assert((it->it_flags & ITEM_SLABBED) == 0); - + //引用计数为0时归还 if (refcount_decr(&it->refcount) == 0) { item_free(it); } } - +//按访问时间,更新在LRU队列的位置 void do_item_update(item *it) { MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes); if (it->time < current_time - ITEM_UPDATE_INTERVAL) { assert((it->it_flags & ITEM_SLABBED) == 0); mutex_lock(&cache_lock); + //达到更新时间间隔 if ((it->it_flags & ITEM_LINKED) != 0) { + //从LUR中删除 item_unlink_q(it); + //更新访问时间 it->time = current_time; + //插入到LRU队列头部 item_link_q(it); } mutex_unlock(&cache_lock); @@ -550,6 +578,10 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { } if (it != NULL) { + //settings.oldest_live初始化值为0 + //检测用户是否使用过flush_all命令,删除所有item + //it->time <= settings.oldest_live就说明用户在使用flush_all命令的时候 + //就已经存在该item了。那么该item是要删除的 if (settings.oldest_live != 0 && settings.oldest_live <= current_time && it->time <= settings.oldest_live) { do_item_unlink(it, hv); @@ -558,7 +590,9 @@ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) { if (was_found) { fprintf(stderr, " -nuked by flush"); } - } else if (it->exptime != 0 && it->exptime <= current_time) { + } + //该item已经过期失效了 + else if (it->exptime != 0 && it->exptime <= current_time) { do_item_unlink(it, hv); do_item_remove(it); it = NULL; @@ -587,6 +621,7 @@ item *do_item_touch(const char *key, size_t nkey, uint32_t exptime, } /* expires items that are more recent than the oldest_live setting. */ +//flush命令删除过期的item,其中oldest_live记录收到的flush_all命令的时间 void do_item_flush_expired(void) { int i; item *iter, *next; diff --git a/memcached-1.4.15/memcached.c b/memcached-1.4.15/memcached.c index 1843eac..3da3269 100644 --- a/memcached-1.4.15/memcached.c +++ b/memcached-1.4.15/memcached.c @@ -321,19 +321,21 @@ conn *conn_from_freelist() { /* * Adds a connection to the freelist. 0 = success. + * 添加conn到空闲链表中 */ bool conn_add_to_freelist(conn *c) { bool ret = true; pthread_mutex_lock(&conn_lock); + //还有空间则直接添加 if (freecurr < freetotal) { freeconns[freecurr++] = c; ret = false; - // 当前使用的下标已经超出了空闲总数, 尝试扩大空闲数组 + //没有空间,进行扩容 } else { /* try to enlarge free connections array */ // 尝试增大空间连接结构体数组 - size_t newsize = freetotal * 2; // 指数增长 + size_t newsize = freetotal * 2; // 扩容2倍 conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize); if (new_freeconns) { freetotal = newsize; @@ -372,7 +374,7 @@ conn *conn_new(const int sfd, enum conn_states init_state, { /* data */ }; - conn *c = conn_from_freelist(); + conn *c = conn_from_freelist();//获取一个空闲的连接 if (NULL == c) { // 可能分配失败了, 因为默认数量有限. 进行新的扩展 @@ -435,7 +437,7 @@ conn *conn_new(const int sfd, enum conn_states init_state, } else { c->request_addr_size = 0; } - + //输出一些日志信息 if (settings.verbose > 1) { if (init_state == conn_listening) { fprintf(stderr, "<%d server listening (%s)\n", sfd, @@ -617,9 +619,10 @@ static void conn_close(conn *c) { * This should only be called in between requests since it can wipe output * buffers! */ + //缩小缓冲区 static void conn_shrink(conn *c) { assert(c != NULL); - + //如果是UDP,则不需要整理 if (IS_UDP(c->transport)) return; @@ -722,6 +725,7 @@ static void conn_set_state(conn *c, enum conn_states state) { static int ensure_iov_space(conn *c) { assert(c != NULL); + //空间使用完了 if (c->iovused >= c->iovsize) { int i, iovnum; struct iovec *new_iov = (struct iovec *)realloc(c->iov, @@ -771,7 +775,7 @@ static int add_iov(conn *c, const void *buf, int len) { add_msghdr(c); m = &c->msglist[c->msgused - 1];+ } - + //保证iovec数组空间足够 if (ensure_iov_space(c) != 0) return -1; @@ -782,7 +786,7 @@ static int add_iov(conn *c, const void *buf, int len) { } else { leftover = 0; } - + //为iov赋值,内容是响应数据 m = &c->msglist[c->msgused - 1]; m->msg_iov[m->msg_iovlen].iov_base = (void *)buf; m->msg_iov[m->msg_iovlen].iov_len = len; @@ -896,7 +900,9 @@ static void out_string(conn *c, const char *str) { // ? c->wcurr = c->wbuf; + //转换为写状态 conn_set_state(c, conn_write); + //写完的下一个状态 c->write_and_go = conn_new_cmd; return; } @@ -919,6 +925,7 @@ static void complete_nread_ascii(conn *c) { if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { out_string(c, "CLIENT_ERROR bad data chunk"); } else { + //将这个item放入到LRU队列和哈希表中 ret = store_item(it, comm, c); #ifdef ENABLE_DTRACE @@ -951,6 +958,7 @@ static void complete_nread_ascii(conn *c) { } #endif + //打印响应信息 switch (ret) { case STORED: out_string(c, "STORED"); @@ -970,6 +978,7 @@ static void complete_nread_ascii(conn *c) { } + //取消本线程对这个item的引用 item_remove(c->item); /* release the c->item reference */ c->item = 0; } @@ -1877,7 +1886,7 @@ static bool authenticated(conn *c) { return rv; } -// 分发命令 +// 分发命令,并设置相应的跳转状态,如read,write,closing等 static void dispatch_bin_command(conn *c) { int protocol_error = 0; @@ -1949,6 +1958,7 @@ static void dispatch_bin_command(conn *c) { } switch (c->cmd) { + //version case PROTOCOL_BINARY_CMD_VERSION: if (extlen == 0 && keylen == 0 && bodylen == 0) { write_bin_response(c, VERSION, 0, 0, strlen(VERSION)); @@ -1956,6 +1966,7 @@ static void dispatch_bin_command(conn *c) { protocol_error = 1; } break; + //flush case PROTOCOL_BINARY_CMD_FLUSH: if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) { bin_read_key(c, bin_read_flush_exptime, extlen); @@ -1963,6 +1974,7 @@ static void dispatch_bin_command(conn *c) { protocol_error = 1; } break; + //noop case PROTOCOL_BINARY_CMD_NOOP: if (extlen == 0 && keylen == 0 && bodylen == 0) { write_bin_response(c, NULL, 0, 0, 0); @@ -1970,6 +1982,7 @@ static void dispatch_bin_command(conn *c) { protocol_error = 1; } break; + //set,add,replace case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */ case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */ case PROTOCOL_BINARY_CMD_REPLACE: @@ -1979,6 +1992,7 @@ static void dispatch_bin_command(conn *c) { protocol_error = 1; } break; + //get case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */ case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */ case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */ @@ -1989,6 +2003,7 @@ static void dispatch_bin_command(conn *c) { protocol_error = 1; } break; + //delete case PROTOCOL_BINARY_CMD_DELETE: if (keylen > 0 && extlen == 0 && bodylen == keylen) { bin_read_key(c, bin_reading_del_header, extlen); @@ -2318,7 +2333,7 @@ static void complete_nread_binary(conn *c) { assert(0); } } - +//整理缓冲区 static void reset_cmd_handler(conn *c) { c->cmd = -1; c->substate = bin_no_state; @@ -2326,7 +2341,7 @@ static void reset_cmd_handler(conn *c) { item_remove(c->item); c->item = NULL; } - conn_shrink(c); + conn_shrink(c);//整理 if (c->rbytes > 0) { // 如果已经读入了数据, 开始解析命令 @@ -2342,9 +2357,11 @@ static void complete_nread(conn *c) { assert(c->protocol == ascii_prot || c->protocol == binary_prot); + //文本协议 if (c->protocol == ascii_prot) { complete_nread_ascii(c); } else if (c->protocol == binary_prot) { + //二进制协议 complete_nread_binary(c); } } @@ -2355,7 +2372,7 @@ static void complete_nread(conn *c) { * * Returns the state of storage. 返回存储结果 */ -enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) { comm 是命令 +enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) { char *key = ITEM_key(it); // 获取旧的数据项 @@ -2392,11 +2409,12 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h } // Memcached于1.2.4版本新增CAS(Check and Set)协议类同于Java并发的CAS(Compare and Swap)原子操作,处理同一item被多个线程更改过程的并发问题 - + // 关于memcached的CAS,可参考:http://www.linuxidc.com/Linux/2015-01/112507p12.htm // ITEM_get_cas(): // #define ITEM_get_cas(i) (((i)->it_flags & ITEM_CAS) ? \ // (i)->data->cas : (uint64_t)0) else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) { + //cas值一致,进行存储 // cas validates // it and old_it may belong to different classes. // I'm updating the stats for the one that's getting pushed out @@ -2409,6 +2427,7 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h stored = STORED; } else { + //cas值不一致,不进行实际存储 pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++; pthread_mutex_unlock(&c->thread->stats.mutex); @@ -2522,6 +2541,10 @@ typedef struct token_s { * command = tokens[ix].value; * } */ + //解析命令command,将解析结果存储到tokens数组中 + //比如:有一个命令 "get aaaaaaaaaa" ,分析完以后存在tokens中的就是 + //tokens[3] = {{value:"get",length:3},{value:"aaaaaaaaaa",length:10},{value:NULL,length:0}}; + //返回值size_t表示tokens数组的大小 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) { char *s, *e; size_t ntokens = 0; @@ -2820,7 +2843,7 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { c->stats.buffer = NULL; } } - +//处理get命令 /* ntokens is overwritten here... shrug.. */ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) { char *key; @@ -2831,6 +2854,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, char *suffix; assert(c != NULL); + //get命令可以同时处理多条记录,如get key1 key2 key3 do { while(key_token->length != 0) { @@ -2907,6 +2931,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, { MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey, it->nbytes, ITEM_get_cas(it)); + //填充要返回的信息 if (add_iov(c, "VALUE ", 6) != 0 || add_iov(c, ITEM_key(it), it->nkey) != 0 || add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0) @@ -2925,6 +2950,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, c->thread->stats.slab_stats[it->slabs_clsid].get_hits++; c->thread->stats.get_cmds++; pthread_mutex_unlock(&c->thread->stats.mutex); + //更新item item_update(it); *(c->ilist + i) = it; i++; @@ -2944,6 +2970,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, * If the command string hasn't been fully processed, get the next set * of tokens. */ + //如果一次不能全部处理完参数,需要继续调用tokenize_command if(key_token->value != NULL) { ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS); key_token = tokens; @@ -2971,13 +2998,14 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, out_string(c, "SERVER_ERROR out of memory writing get response"); } else { + //设置conn_mwrite状态,准备回应数据 conn_set_state(c, conn_mwrite); c->msgcurr = 0; } return; } - +//add,set,replace,prepend,append为“更新”命令,调用同一个函数执行命令 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) { char *key; size_t nkey; @@ -2990,16 +3018,18 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken assert(c != NULL); + //设置服务器不需要回复信息给客户端 set_noreply_maybe(c, tokens, ntokens); - + //key过长 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { out_string(c, "CLIENT_ERROR bad command line format"); return; } - key = tokens[KEY_TOKEN].value; - nkey = tokens[KEY_TOKEN].length; + key = tokens[KEY_TOKEN].value;//键名 + nkey = tokens[KEY_TOKEN].length;//键长度 + //将字符串转换为long if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags) && safe_strtol(tokens[3].value, &exptime_int) && safe_strtol(tokens[4].value, (int32_t *)&vlen))) { @@ -3033,7 +3063,7 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken if (settings.detail_enabled) { stats_prefix_record_set(key, nkey); } - + //根据需要的大小分配item it = item_alloc(key, nkey, flags, realtime(exptime), vlen); if (it == 0) { @@ -3063,6 +3093,7 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken c->ritem = ITEM_data(it); c->rlbytes = it->nbytes; c->cmd = comm; + //目前还没有给key赋值相应的value,需要继续进入状态机,执行命令的其他工作 conn_set_state(c, conn_nread); } @@ -3334,9 +3365,30 @@ static void process_slabs_automove_command(conn *c, token_t *tokens, const size_ out_string(c, "OK"); return; } - +// process_command注释参考:http://www.cnblogs.com/lrxing/p/4273387.html +// process_command 在memcached中是用来处理用户发送的命令的, +// 包括get set,add,delete,replace,stats,flush_all等常用的和不常用的命令,全在这进行处理的。 +// 一共有两个参数,conn *c 和字符串指针 char *command ; +// 关于conn结构体就先不说了,反正它是整个memcached中最重要的结构体就行了,等以后研究明白了再说,先从整体上领会精神吧。 +// 这里我想说一下的是, memcached 和 redis 在处理命令上的想法还是有很大差别的, +// 在 redis 里面,你要是想看一下一共支持多少命令,每个命令对应的函数,很方便,都在一个名叫 redisCommandTable 的结构体数组里面,一目了然; +// 但是 memcached 却不是这样,我刚开始也是按照看 redis 源码的方式去找 memcached 中的,但是找了很久也没有发现,原因就是作者把支持的所有命令都散落在下面这个函数中了。 +// 先说一下 memcached 是怎么从 command 字符串中分解出具体的命令和对应的参数的。 +// 在函数中用到了一个结构体数组: tokens[MAX_TOKENS] ,它其实是用来存放分析完的 command 结果用的,分析工作在函数 tokenize_command 中进行。 +// 如:有一个命令 "get aaaaaaaaaa" ,分析完以后存在tokens中的就是 +// tokens[3] = {{value:"get",length:3},{value:"aaaaaaaaaa",length:10},{value:NULL,length:0}}; +// 函数 tokenize_command 返回的是一个int型数据 ntokens,记录了 tokens 的大小,表示从 command 命令中分解出了几条数据, +// 当然 ntokens 的值会比实际中 command 中包含的数据多1,因为字符串结尾的'\0'也要占一样. +// 下面说说以 "get aaaaaaaaaa" 命令为例,具体的命令分析和函数调用过程: +// 当一条命令 "get aaaaaaaaaa" 传到 process_command 中之后,先调用负责解析命令的函数tokenize_command, +// 将解析后的命令存储在tokens数组中,结果如上面的tokens[3],并返回ntokens,说明command中包含几个字段(这里得到的是3), +// 然后根据字段的数目进行判断应该到哪个条件语句去进行比对,当确认之后,就会跳到对应的条件语句中 +// 所以这里应该到 tokenize_command 下面的第一个if语句中,然后使用tokens[0].value,也就是tokens数组中存储的get命令和字符串"get"进行比较, +// 匹配,则调用对应的函数,这里调用process_get_command(c, tokens, ntokens, false); +// 然后 process_command 的使命就结束了。 +// 以下代码在memcached-1.4.22/memcached.c static void process_command(conn *c, char *command) { - + //tokens存储命令解析结果 token_t tokens[MAX_TOKENS]; size_t ntokens; int comm; @@ -3360,8 +3412,9 @@ static void process_command(conn *c, char *command) { out_string(c, "SERVER_ERROR out of memory preparing response"); return; } - + //解析命令 ntokens = tokenize_command(command, tokens, MAX_TOKENS); + //下面通过token.value进行命令匹配,调用相应的函数 if (ntokens >= 3 && ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) || (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) { @@ -3506,8 +3559,9 @@ static int try_read_command(conn *c) { assert(c != NULL); assert(c->rcurr <= (c->rbuf + c->rsize)); assert(c->rbytes > 0); - + //memcached支持二进制协议和文本协议 if (c->protocol == negotiating_prot || c->transport == udp_transport) { + //PROTOCOL_BINARY_REQ为0x80,即128。对于ascii的文本来说,是不会取这个值的,所以判断为binary_prot if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) { c->protocol = binary_prot; } else { @@ -3519,11 +3573,12 @@ static int try_read_command(conn *c) { prot_text(c->protocol)); } } - + //二进制协议 if (c->protocol == binary_prot) { /* Do we have the complete packet header? */ + //读取到的数据长度小于数据包头部大小,即没有读取到完整数据 if (c->rbytes < sizeof(c->binary_header)) { - /* need more data! */ + /* need more data! 返回继续读取数据*/ return 0; } else { #ifdef NEED_ALIGN @@ -3559,7 +3614,7 @@ static int try_read_command(conn *c) { c->binary_header.request.keylen = ntohs(req->request.keylen); c->binary_header.request.bodylen = ntohl(req->request.bodylen); c->binary_header.request.cas = ntohll(req->request.cas); - + //判断魔数是否合法,魔数用来防止TCP粘包??? if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ) { if (settings.verbose) { fprintf(stderr, "Invalid magic: %x\n", @@ -3583,19 +3638,21 @@ static int try_read_command(conn *c) { /* clear the returned cas value */ c->cas = 0; - + //数据处理 dispatch_bin_command(c); c->rbytes -= sizeof(c->binary_header); c->rcurr += sizeof(c->binary_header); } } else { + //文本协议 char *el, *cont; if (c->rbytes == 0) return 0; el = memchr(c->rcurr, '\n', c->rbytes); + //没有读取到一条完整的命令 if (!el) { if (c->rbytes > 1024) { /* @@ -3617,14 +3674,15 @@ static int try_read_command(conn *c) { return 0; } + //读取到了一条完整的命令 cont = el + 1; + //判断\r\n if ((el - c->rcurr) > 1 && *(el - 1) == '\r') { el--; } *el = '\0'; - assert(cont <= (c->rcurr + c->rbytes)); - + //处理这条命令 process_command(c, c->rcurr); c->rbytes -= (cont - c->rcurr); @@ -3639,6 +3697,7 @@ static int try_read_command(conn *c) { /* * read a UDP request. */ + //udp,读取网络数据 static enum try_read_result try_read_udp(conn *c) { int res; @@ -3694,6 +3753,7 @@ static enum try_read_result try_read_udp(conn *c) { * * @return enum try_read_result */ + //tcp,读取网络数据 static enum try_read_result try_read_network(conn *c) { enum try_read_result gotdata = READ_NO_DATA_RECEIVED; int res; @@ -3706,7 +3766,7 @@ static enum try_read_result try_read_network(conn *c) { memmove(c->rbuf, c->rcurr, c->rbytes); c->rcurr = c->rbuf; } - + //循环读取 while (1) { if (c->rbytes >= c->rsize) { // 如果分配的次数超过 4 次, 退出 @@ -3715,6 +3775,7 @@ static enum try_read_result try_read_network(conn *c) { } ++num_allocs; char *new_rbuf = realloc(c->rbuf, c->rsize * 2); + //分配内存失败 if (!new_rbuf) { if (settings.verbose > 0) fprintf(stderr, "Couldn't realloc input buffer\n"); @@ -3723,14 +3784,14 @@ static enum try_read_result try_read_network(conn *c) { c->write_and_go = conn_closing; return READ_MEMORY_ERROR; } - c->rcurr = c->rbuf = new_rbuf; - c->rsize *= 2; + c->rcurr = c->rbuf = new_rbuf;//读缓冲区指向新的缓冲区 + c->rsize *= 2;//读缓冲区的大小扩大2倍 } // 计算剩余的空间 int avail = c->rsize - c->rbytes; - // 读数据. + // 非阻塞读数据 // 技巧: 每次只从 socket 中读取定量的数据. 如果已读的数据大小 == avail, 说明还有数据可读, 循环继续; 否则, 已经读取完毕, 结束循环 res = read(c->sfd, c->rbuf + c->rbytes, avail); @@ -3751,6 +3812,7 @@ static enum try_read_result try_read_network(conn *c) { if (res == 0) { return READ_ERROR; } + //非阻塞读 if (res == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { break; @@ -3760,15 +3822,15 @@ static enum try_read_result try_read_network(conn *c) { } return gotdata; } - +//更新libevent状态,也就是删除libevent事件后,重新注册libevent事件 static bool update_event(conn *c, const int new_flags) { assert(c != NULL); struct event_base *base = c->event.ev_base; if (c->ev_flags == new_flags) return true; - if (event_del(&c->event) == -1) return false; - event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c); + if (event_del(&c->event) == -1) return false;//删除旧事件 + event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);//重新添加事件 event_base_set(base, &c->event); c->ev_flags = new_flags; if (event_add(&c->event, 0) == -1) return false; @@ -3819,6 +3881,7 @@ void do_accept_new_conns(const bool do_accept) { * TRANSMIT_SOFT_ERROR Can't write any more right now. * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing) */ + //写数据 static enum transmit_result transmit(conn *c) { assert(c != NULL); @@ -3898,7 +3961,7 @@ static void drive_machine(conn *c) { case conn_listening: addrlen = sizeof(addr); - // 接受连接 + // 接受连接,这是是非阻塞编程 if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) { // 失败了 @@ -3916,7 +3979,7 @@ static void drive_machine(conn *c) { }// if break; } - + //连接成功,设置文件描述符为非阻塞 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); @@ -3947,14 +4010,15 @@ static void drive_machine(conn *c) { // 等待新的命令请求 case conn_waiting: - if (!update_event(c, EV_READ | EV_PERSIST)) { + if (!update_event(c, EV_READ | EV_PERSIST)) { //修改libevent状态,读取数据 if (settings.verbose > 0) fprintf(stderr, "Couldn't update event\n"); conn_set_state(c, conn_closing); break; } - + //进入读数据状态 conn_set_state(c, conn_read); + //可以stop,因为本event是水平触发的 stop = true; break; @@ -3964,13 +4028,15 @@ static void drive_machine(conn *c) { // 读完请求后, 转换 conn 的状态 switch (res) { // READ_NO_DATA_RECEIVED, READ_DATA_RECEIVED, READ_ERROR, READ_MEMORY_ERROR 四种状态只有 try_read_udp() try_read_network() 会返回 + // 未读取到数据 case READ_NO_DATA_RECEIVED: conn_set_state(c, conn_waiting); break; - // 读取请求成功 + // 读取请求成功,接着去解析数据 case READ_DATA_RECEIVED: conn_set_state(c, conn_parse_cmd); break; + // 读取发生错误 case READ_ERROR: conn_set_state(c, conn_closing); break; @@ -3982,6 +4048,11 @@ static void drive_machine(conn *c) { // 尝试解析命令 case conn_parse_cmd : + //返回1表示正在处理读取的一条命令 + //返回0表示需要继续读取socket的数据才能解析命令 + //如果读取到了一条完整的命令,那么函数内部会去解析, + //并进行调用process_command函数进行一些处理. + //像set、add、replace这些命令,会在处理的时候调conn_set_state(c, conn_nread) if (try_read_command(c) == 0) { /* we need more data! */ conn_set_state(c, conn_waiting); @@ -3994,6 +4065,7 @@ static void drive_machine(conn *c) { /* Only process nreqs at a time to avoid starving other connections */ + //该变量用来防止客户端的命令数目过多 --nreqs; if (nreqs >= 0) { // 调整 conn 的状态, 将 conn 的状态更正为 @@ -4024,9 +4096,11 @@ static void drive_machine(conn *c) { break; // 真正执行命令的地方 + //案例分析,详情请参考http://www.oschina.net/question/1441503_194178 case conn_nread: // 如果已经把所有的数据读完了, 直接执行命令 if (c->rlbytes == 0) { + //完成对一个item的操作,包括插入到LRU队列和哈希表 complete_nread(c); break; } @@ -4035,6 +4109,7 @@ static void drive_machine(conn *c) { // 看是否还有剩下未读的数据 if (c->rbytes > 0) { int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; + //对于add,set,replace等命令,下面就是key赋值value的具体操作 if (c->ritem != c->rcurr) { memmove(c->ritem, c->rcurr, tocopy); } @@ -4049,6 +4124,7 @@ static void drive_machine(conn *c) { /* now try reading from the socket */ res = read(c->sfd, c->ritem, c->rlbytes); + //只要socket读取数据不发生错误,状态机就会一直保持conn_nread的状态 if (res > 0) { pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.bytes_read += res; @@ -4163,7 +4239,9 @@ static void drive_machine(conn *c) { conn_set_state(c, conn_closing); break; } + //调用transmit函数发送数据给相应的客户端 switch (transmit(c)) { + //发送数据完毕 case TRANSMIT_COMPLETE: if (c->state == conn_mwrite) { @@ -4186,7 +4264,7 @@ static void drive_machine(conn *c) { if(c->protocol == binary_prot) { conn_set_state(c, c->write_and_go); } else { - // 等待新的命令 + // 等待新的命令,回到一开始的conn_new_cmd状态 conn_set_state(c, conn_new_cmd); } @@ -4247,21 +4325,21 @@ void event_handler(const int fd, const short which, void *arg) { conn_close(c); return; } - + //进入状态机,进行处理 drive_machine(c); /* wait for next event */ return; } - +//获取一个socket资源 static int new_socket(struct addrinfo *ai) { int sfd; int flags; - + //获取socket资源 if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) { return -1; } - + //fcntl设置非阻塞 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); @@ -4275,6 +4353,7 @@ static int new_socket(struct addrinfo *ai) { /* * Sets a socket's send buffer size to the maximum allowed by the system. */ + //设置socket的发送缓冲区参数为系统允许的最大值 static void maximize_sndbuf(const int sfd) { socklen_t intsize = sizeof(int); int last_good = 0; @@ -4282,6 +4361,7 @@ static void maximize_sndbuf(const int sfd) { int old_size; /* Start with the default size. */ + //选项SO_SNDBUF表示发送缓冲区 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) { if (settings.verbose > 0) perror("getsockopt(SO_SNDBUF)"); @@ -4289,6 +4369,7 @@ static void maximize_sndbuf(const int sfd) { } /* Binary-search for the real maximum. */ + //这里通过二分法来逐步获取最大值,很巧妙的设计,值得学习,MAX_SENDBUF_SIZE为 256 * 1024 * 1024. min = old_size; max = MAX_SENDBUF_SIZE; @@ -4315,6 +4396,7 @@ static void maximize_sndbuf(const int sfd) { * when they are successfully added to the list of ports we * listen on. */ + //创建套接字,绑定端口 static int server_socket(const char *interface, int port, enum network_transport transport, @@ -4329,7 +4411,7 @@ static int server_socket(const char *interface, int error; int success = 0; int flags =1; - + //判断udp或者tcp hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM; if (port == -1) { @@ -4338,6 +4420,8 @@ static int server_socket(const char *interface, snprintf(port_buf, sizeof(port_buf), "%d", port); // getaddrinfo函数能够处理名字到地址以及服务到端口这两种转换,返回的是一个addrinfo的结构(列表)指针而不是一个地址清单。 + //调用getaddrinfo,将主机地址和端口号映射成为socket地址信息,地址信息由ai带回 + //参考:《UNP》P245 getaddrinfo error= getaddrinfo(interface, port_buf, &hints, &ai); if (error != 0) { @@ -4348,6 +4432,10 @@ static int server_socket(const char *interface, return 1; } + //getaddrinfo返回多个addrinfo的情形有如下两种: + //1.如果与interface参数关联的地址有多个,那么适用于所请求地址簇的每个地址都返回一个对应的结构。 + //2.如果port_buf参数指定的服务支持多个套接口类型,那么每个套接口类型都可能返回一个对应的结构。 + //reference:http://blog.csdn.net/lcli2009/article/details/21609871 for (next= ai; next; next= next->ai_next) { conn *listen_conn_add; @@ -4366,6 +4454,7 @@ static int server_socket(const char *interface, #ifdef IPV6_V6ONLY if (next->ai_family == AF_INET6) { + //设置ipv6的选项值,只接受ipv6的数据包 error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags)); if (error != 0) { perror("setsockopt"); @@ -4375,20 +4464,21 @@ static int server_socket(const char *interface, } #endif - // 设置为复用端口 + //设定socket选项,SO_REUSEADDR表示重用地址信息,具体查看《UNP》p151 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); - + //如果是udp协议,则扩大发送缓冲区 if (IS_UDP(transport)) { maximize_sndbuf(sfd); } else { + //设定socket选项,SO_KEEPALIVE表示保活,保持连接检测对方主机是否崩溃 error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); if (error != 0) perror("setsockopt"); - + //设定socket选项,SO_LINGER表示执行close操作时,如果缓冲区还有数据,可以继续发送 error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); if (error != 0) perror("setsockopt"); - + //TCP_NODELAY表示禁用Nagle算法,提高效率 error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); if (error != 0) perror("setsockopt"); @@ -4467,19 +4557,22 @@ static int server_socket(const char *interface, listen_conn_add->next = listen_conn; listen_conn = listen_conn_add; } - } - + }// end for + //释放资源 freeaddrinfo(ai); /* Return zero iff we detected no errors in starting up connections */ return success == 0; } +//ip和端口的监听与绑定 static int server_sockets(int port, enum network_transport transport, FILE *portnumber_file) { if (settings.inter == NULL) { + //执行监听和绑定操作 return server_socket(settings.inter, port, transport, portnumber_file); } else { + //如果服务器有多个ip信息,可以在每个(ip,port)上面绑定一个Memcached实例,下面是一些输入参数的解析,解析完毕之后,执行绑定 // tokenize them and bind to each one of them.. char *b; int ret = 0; @@ -4494,6 +4587,7 @@ static int server_sockets(int port, enum network_transport transport, } // strtok_r是linux平台下的strtok函数的线程安全版。第二个参数是分隔符, 可以设置多个分隔符. + //for循环绑定多次 for (char *p = strtok_r(list, ";,", &b); p != NULL; p = strtok_r(NULL, ";,", &b)) { @@ -5392,7 +5486,7 @@ int main (int argc, char **argv) { exit(EX_OSERR); } - // 启动工作线程 + // 创建并启动工作线程组,传入线程个数和libevent的实例 /* start up worker threads if MT mode */ thread_init(settings.num_threads, main_base); @@ -5410,14 +5504,10 @@ int main (int argc, char **argv) { // 初始化定时事件 clock_handler(0, 0, 0); - /** - memcached 有可配置的两种模式: unix 域套接字和 TCP/UDP, 允许客户端以两种方式向 memcached 发起请求. 客户端和服务器在同一个主机上的情况下可以用 unix 域套接字, 否则可以采用 TCP/UDP 的模式. 两种模式是不兼容的. - 以下的代码便是根据 settings.socketpath 的值来决定启用哪种方式. - */ + //memcached 有可配置的两种模式: unix 域套接字和 TCP/UDP, 允许客户端以两种方式向 memcached 发起请求. 客户端和服务器在同一个主机上的情况下可以用 unix 域套接字, 否则可以采用 TCP/UDP 的模式. 两种模式是不兼容的. + //以下的代码便是根据 settings.socketpath 的值来决定启用哪种方式. - /** - 第一种, unix 域套接字. - */ + //第一种, unix 域套接字. /* create unix mode sockets after dropping privileges */ if (settings.socketpath != NULL) { errno = 0; @@ -5427,9 +5517,7 @@ int main (int argc, char **argv) { } } - /** - 第二种, TCP/UDP. - */ + //第二种, TCP/UDP. /* create the listening socket, bind it, and init */ if (settings.socketpath == NULL) { const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME"); diff --git a/memcached-1.4.15/memcached.h b/memcached-1.4.15/memcached.h index 5d161fe..5f54b1d 100644 --- a/memcached-1.4.15/memcached.h +++ b/memcached-1.4.15/memcached.h @@ -219,6 +219,7 @@ struct slab_stats { /** * Stats stored per-thread. + * 线程的统计信息 */ struct thread_stats { pthread_mutex_t mutex; @@ -241,6 +242,7 @@ struct thread_stats { /** * Global stats. + * 全局的统计信息 */ struct stats { pthread_mutex_t mutex; @@ -381,7 +383,10 @@ typedef struct _stritem { // 多个线程, 每个线程一个 event_base typedef struct { + //线程ID pthread_t thread_id; /* unique ID of this thread */ + + //libevent的event_base实例,每个线程一个 struct event_base *base; /* libevent handle this thread uses */ // event 结构体, 用于管道读写事件的监听 @@ -391,15 +396,18 @@ typedef struct { int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ - // 线程的状态 + // 线程的状态,是一些统计信息 struct thread_stats stats; /* Stats generated by this thread */ // 这个线程需要处理的连接队列 struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ + + //线程操作的锁类型,局部锁或者全局锁 uint8_t item_lock_type; /* use fine-grained or global item lock */ } LIBEVENT_THREAD; +//分发线程的结构体 typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ diff --git a/memcached-1.4.15/slabs.c b/memcached-1.4.15/slabs.c index 1281069..5b94453 100644 --- a/memcached-1.4.15/slabs.c +++ b/memcached-1.4.15/slabs.c @@ -45,15 +45,15 @@ typedef struct { unsigned int killing; /* index+1 of dying slab, or zero if none */ size_t requested; /* The number of requested bytes */ } slabclass_t; - +//内存池管理相关的静态全局变量 static slabclass_t slabclass[MAX_NUMBER_OF_SLAB_CLASSES]; -static size_t mem_limit = 0; -static size_t mem_malloced = 0; +static size_t mem_limit = 0; //总的内存大小 +static size_t mem_malloced = 0; static int power_largest; -static void *mem_base = NULL; -static void *mem_current = NULL; -static size_t mem_avail = 0; +static void *mem_base = NULL; //指向总的内存首地址 +static void *mem_current = NULL; //指向当前内存地址 +static size_t mem_avail = 0; //当前可用内存大小 /** * Access to the slab allocator is protected by this lock @@ -83,7 +83,7 @@ static void slabs_preallocate (const unsigned int maxslabs); * Given object size, return id to use when allocating/freeing memory for object * 0 means error: can't store such a large object */ - +//memcached在定位item时,都是使用slabs_clsid函数,传入参数为item大小,返回值为classid. unsigned int slabs_clsid(const size_t size) { int res = POWER_SMALLEST; @@ -99,15 +99,16 @@ unsigned int slabs_clsid(const size_t size) { * Determines the chunk sizes and initializes the slab class descriptors * accordingly. */ + //关于memcached的内存管理,可以参考:http://basiccoder.com/memcached-memory-mamagement.html & http://kenby.iteye.com/blog/1423989 void slabs_init(const size_t limit, const double factor, const bool prealloc) { int i = POWER_SMALLEST - 1; unsigned int size = sizeof(item) + settings.chunk_size; mem_limit = limit; - + //支持预分配 if (prealloc) { /* Allocate everything in a big chunk with malloc */ - mem_base = malloc(mem_limit); + mem_base = malloc(mem_limit);//申请空间,mem_base记录空间 if (mem_base != NULL) { mem_current = mem_base; mem_avail = mem_limit; @@ -116,23 +117,23 @@ void slabs_init(const size_t limit, const double factor, const bool prealloc) { " one large chunk.\nWill allocate in smaller chunks\n"); } } - + //初始化slabclass数组空间,置0 memset(slabclass, 0, sizeof(slabclass)); - + //开始分配,每个slab的大小按增长因子递增 while (++i < POWER_LARGEST && size <= settings.item_size_max / factor) { /* Make sure items are always n-byte aligned */ - if (size % CHUNK_ALIGN_BYTES) + if (size % CHUNK_ALIGN_BYTES)//执行对齐操作,这里CHUNK_ALIGN_BYTES是8bytes size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES); slabclass[i].size = size; slabclass[i].perslab = settings.item_size_max / slabclass[i].size; - size *= factor; + size *= factor; //size下一个值为按增长因子的倍数增长 if (settings.verbose > 1) { fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n", i, slabclass[i].size, slabclass[i].perslab); } } - + //循环结束时,size已经增长到1M power_largest = i; slabclass[power_largest].size = settings.item_size_max; slabclass[power_largest].perslab = 1; @@ -149,7 +150,7 @@ void slabs_init(const size_t limit, const double factor, const bool prealloc) { } } - + //分配每个slab的内存空间,传入最大已经初始化的最大slab编号 if (prealloc) { slabs_preallocate(power_largest); } @@ -168,6 +169,7 @@ static void slabs_preallocate (const unsigned int maxslabs) { for (i = POWER_SMALLEST; i <= POWER_LARGEST; i++) { if (++prealloc > maxslabs) return; + //执行分配操作,对第i个slabclass执行分配操作 if (do_slabs_newslab(i) == 0) { fprintf(stderr, "Error while preallocating slab memory!\n" "If using -L or other prealloc options, max memory must be " @@ -184,12 +186,13 @@ static int grow_slab_list (const unsigned int id) { size_t new_size = (p->list_size != 0) ? p->list_size * 2 : 16; void *new_list = realloc(p->slab_list, new_size * sizeof(void *)); if (new_list == 0) return 0; - p->list_size = new_size; + p->list_size = new_size;//修改第id个slabclass的值 p->slab_list = new_list; } return 1; } +//将ptr指向的内存空间按第id个slabclass的size进行切分 static void split_slab_page_into_freelist(char *ptr, const unsigned int id) { slabclass_t *p = &slabclass[id]; int x; @@ -266,6 +269,7 @@ static void *do_slabs_alloc(const size_t size, unsigned int id) { return ret; } +//创建空闲item static void do_slabs_free(void *ptr, const size_t size, unsigned int id) { slabclass_t *p; item *it; @@ -379,27 +383,27 @@ static void do_slabs_stats(ADD_STAT add_stats, void *c) { APPEND_STAT("total_malloced", "%llu", (unsigned long long)mem_malloced); add_stats(NULL, 0, NULL, 0, c); } - +//从内存池分配size个空间 static void *memory_allocate(size_t size) { void *ret; - // mem_base 是 memcached 预先分配的内存,如果有需要可以从这里,以提高当下 - // 内存分配的效率 + //如果内存池没创建,则从系统分配 if (mem_base == NULL) { /* We are not using a preallocated large memory chunk */ ret = malloc(size); } else { ret = mem_current; - + //剩余空间不足 if (size > mem_avail) { return NULL; } /* mem_current pointer _must_ be aligned!!! */ + //字节对齐 if (size % CHUNK_ALIGN_BYTES) { size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES); } - + //更新当前空间指针 mem_current = ((char*)mem_current) + size; if (size < mem_avail) { mem_avail -= size; diff --git a/memcached-1.4.15/slabs.h b/memcached-1.4.15/slabs.h index bad0659..a881675 100644 --- a/memcached-1.4.15/slabs.h +++ b/memcached-1.4.15/slabs.h @@ -9,6 +9,7 @@ 3rd argument specifies if the slab allocator should allocate all memory up front (if true), or allocate memory in chunks as it is needed (if false) */ +//内存初始化,settings.maxbytes是Memcached初始启动参数指定的内存值大小,settings.factor是内存增长因子 void slabs_init(const size_t limit, const double factor, const bool prealloc); diff --git a/memcached-1.4.15/thread.c b/memcached-1.4.15/thread.c index ec7ab32..faa1c76 100644 --- a/memcached-1.4.15/thread.c +++ b/memcached-1.4.15/thread.c @@ -19,7 +19,9 @@ /* An item in the connection queue. */ typedef struct conn_queue_item CQ_ITEM; +//连接队列的元素 struct conn_queue_item { + //通信套接字的文件描述符 int sfd; enum conn_states init_state; int event_flags; @@ -29,11 +31,16 @@ struct conn_queue_item { }; /* A connection queue. */ +//连接队列 typedef struct conn_queue CQ; struct conn_queue { + //头指针 CQ_ITEM *head; + //尾指针 CQ_ITEM *tail; + //锁 pthread_mutex_t lock; + //条件变量 pthread_cond_t cond; }; @@ -81,9 +88,10 @@ static pthread_cond_t init_cond; static void thread_libevent_process(int fd, short which, void *arg); - +//增加引用计数 unsigned short refcount_incr(unsigned short *refcount) { #ifdef HAVE_GCC_ATOMICS + //__sync_add_and_fetch是gcc下提供的原子操作函数 return __sync_add_and_fetch(refcount, 1); #elif defined(__sun) return atomic_inc_ushort_nv(refcount); @@ -96,9 +104,10 @@ unsigned short refcount_incr(unsigned short *refcount) { return res; #endif } - +//减少引用计数 unsigned short refcount_decr(unsigned short *refcount) { #ifdef HAVE_GCC_ATOMICS + //__sync_sub_and_fetch是gcc下提供的原子操作函数 return __sync_sub_and_fetch(refcount, 1); #elif defined(__sun) return atomic_dec_ushort_nv(refcount); @@ -120,12 +129,15 @@ void item_lock_global(void) { void item_unlock_global(void) { mutex_unlock(&item_global_lock); } - +//item加锁 void item_lock(uint32_t hv) { + //获取线程私有变量 uint8_t *lock_type = pthread_getspecific(item_lock_type_key); if (likely(*lock_type == ITEM_LOCK_GRANULAR)) { + //对桶加锁 mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]); } else { + //所有item加锁,全局级别 mutex_lock(&item_global_lock); } } @@ -173,16 +185,17 @@ static void register_thread_initialized(void) { pthread_cond_signal(&init_cond); pthread_mutex_unlock(&init_lock); } - +// 发送消息,切换锁级别 void switch_item_lock_type(enum item_lock_types type) { char buf[1]; int i; switch (type) { - // 需要更改的状态 + //用l表示ITEM_LOCK_GRANULAR 段级别锁 case ITEM_LOCK_GRANULAR: buf[0] = 'l'; break; + //用g表示ITEM_LOCK_GLOBAL 全局级别锁 case ITEM_LOCK_GLOBAL: buf[0] = 'g'; break; @@ -195,11 +208,13 @@ void switch_item_lock_type(enum item_lock_types type) { pthread_mutex_lock(&init_lock); init_count = 0; for (i = 0; i < settings.num_threads; i++) { + //通过向worker监听的管道写入一个字符通知worker线程 if (write(threads[i].notify_send_fd, buf, 1) != 1) { perror("Failed writing to notify pipe"); /* TODO: This is a fatal problem. Can it ever happen temporarily? */ } } + //等待所有的workers线程都把锁切换到type指明的锁类型 wait_for_thread_registration(settings.num_threads); pthread_mutex_unlock(&init_lock); } @@ -207,6 +222,7 @@ void switch_item_lock_type(enum item_lock_types type) { /* * Initializes a connection queue. */ +//连接队列初始化 static void cq_init(CQ *cq) { pthread_mutex_init(&cq->lock, NULL); pthread_cond_init(&cq->cond, NULL); @@ -219,12 +235,15 @@ static void cq_init(CQ *cq) { * one. * Returns the item, or NULL if no item is available */ + //获取一个连接 static CQ_ITEM *cq_pop(CQ *cq) { CQ_ITEM *item; pthread_mutex_lock(&cq->lock); + //获取头指针 item = cq->head; if (NULL != item) { + //头指针不为空,则更新头指针 cq->head = item->next; if (NULL == cq->head) cq->tail = NULL; @@ -236,8 +255,8 @@ static CQ_ITEM *cq_pop(CQ *cq) { /* * Adds an item to a connection queue. - * 在连接队列中增加一个项 */ + //添加一个新的连接 static void cq_push(CQ *cq, CQ_ITEM *item) { item->next = NULL; @@ -255,6 +274,7 @@ static void cq_push(CQ *cq, CQ_ITEM *item) { /* * Returns a fresh connection queue item. */ + //新建连接 static CQ_ITEM *cqi_new(void) { CQ_ITEM *item = NULL; pthread_mutex_lock(&cqi_freelist_lock); @@ -268,6 +288,7 @@ static CQ_ITEM *cqi_new(void) { int i; /* Allocate a bunch of items at once to reduce fragmentation */ + //一次性分配多个连接的空间 item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC); if (NULL == item) return NULL; @@ -293,6 +314,7 @@ static CQ_ITEM *cqi_new(void) { /* * Frees a connection queue item (adds it to the freelist.) */ +//释放连接item,将其添加到空闲链表中 static void cqi_free(CQ_ITEM *item) { pthread_mutex_lock(&cqi_freelist_lock); item->next = cqi_freelist; @@ -303,7 +325,7 @@ static void cqi_free(CQ_ITEM *item) { /* * Creates a worker thread. - * 启动线程 + * 创建工作线程 */ static void create_worker(void *(*func)(void *), void *arg) { pthread_t thread; @@ -311,7 +333,7 @@ static void create_worker(void *(*func)(void *), void *arg) { int ret; pthread_attr_init(&attr); - + //线程处理函数为:worker_libevent if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) { fprintf(stderr, "Can't create thread: %s\n", strerror(ret)); @@ -338,6 +360,7 @@ void accept_new_conns(const bool do_accept) { // 初始化互斥量 // 等 static void setup_thread(LIBEVENT_THREAD *me) { + //初始化event_base,请参考libevent手册 me->base = event_init(); if (! me->base) { fprintf(stderr, "Can't allocate event base\n"); @@ -348,6 +371,7 @@ static void setup_thread(LIBEVENT_THREAD *me) { // 在线程数据结构初始化的时候, 为 me->notify_receive_fd 读管道注册读事件, 回调函数是 thread_libevent_process() event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); + //为event_base实例注册nofify_event事件,请参考libevent手册 event_base_set(me->base, &me->notify_event); if (event_add(&me->notify_event, 0) == -1) { @@ -355,7 +379,7 @@ static void setup_thread(LIBEVENT_THREAD *me) { exit(1); } - // 初始化该线程的工作队列 + // 创建并初始化该线程的工作队列 me->new_conn_queue = malloc(sizeof(struct conn_queue)); if (me->new_conn_queue == NULL) { perror("Failed to allocate memory for connection queue"); @@ -393,6 +417,7 @@ static void *worker_libevent(void *arg) { * all item_lock calls... */ me->item_lock_type = ITEM_LOCK_GRANULAR; + //设置线程的私有数据,锁的级别属性 pthread_setspecific(item_lock_type_key, &me->item_lock_type); register_thread_initialized(); @@ -425,6 +450,7 @@ static void thread_libevent_process(int fd, short which, void *arg) { if (NULL != item) { // 为新的请求建立一个连接结构体. 连接其实已经建立, 这里只是为了填充连接结构体. 最关键的动作是在 libevent 中注册了事件, 回调函数是 event_handler() + //event_handler的执行流程最终会进入业务处理的状态机中 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { @@ -446,11 +472,12 @@ static void thread_libevent_process(int fd, short which, void *arg) { break; /* we were told to flip the lock type and report in */ + //切换为段锁 case 'l': me->item_lock_type = ITEM_LOCK_GRANULAR; register_thread_initialized(); break; - + //切换为全局锁 case 'g': me->item_lock_type = ITEM_LOCK_GLOBAL; register_thread_initialized(); @@ -477,7 +504,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, char buf[1]; // 线程池中有多个线程, 每个线程都有一个工作队列, 线程所需要做的就是从工作队列中取出工作任务并执行, 只要队列为空线程就可以进入等待状态 - // 计算线程信息下标 + // 通过轮询来选择一个线程 int tid = (last_thread + 1) % settings.num_threads; // LIBEVENT_THREAD threads 是一个全局数组变量 @@ -811,7 +838,7 @@ void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) { * nthreads Number of worker event handler threads to spawn 需准备的线程数 * main_base Event base for main thread - 分发线程 + 分发线程的libevent实例 */ void thread_init(int nthreads, struct event_base *main_base) { int i; @@ -829,6 +856,7 @@ void thread_init(int nthreads, struct event_base *main_base) { /* Want a wide lock table, but don't waste memory */ // 锁表? + // nthreads表示线程的数量 if (nthreads < 3) { power = 10; } else if (nthreads < 4) { @@ -841,24 +869,29 @@ void thread_init(int nthreads, struct event_base *main_base) { power = 13; } - // 预申请那么多的锁, 拿来做什么 - // hashsize = 2^n + // memcached有两种锁机制,可参考:http://blog.csdn.net/luotuo44/article/details/42913549 + // hashsize = 2^n,若干个桶共用一个锁,所以需要很多锁 item_lock_count = hashsize(power); - + // 分配大量锁 item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t)); if (! item_locks) { perror("Can't allocate item locks"); exit(1); } - // 初始化 + // 对锁进行初始化 for (i = 0; i < item_lock_count; i++) { pthread_mutex_init(&item_locks[i], NULL); } + + // 创建线程的局部变量,该局部变量的名称为item_lock_type_key,用于保存主hash表所持有的锁的类型 + // 主hash表在进行扩容时,该锁类型会变为全局的锁,否则(不在扩容过程中),则是局部锁 + // reference:http://blog.csdn.net/lcli2009/article/details/21525839 pthread_key_create(&item_lock_type_key, NULL); pthread_mutex_init(&item_global_lock, NULL); // LIBEVENT_THREAD 是结合 libevent 使用的结构体, event_base, 读写管道 + //为线程组分配nthreads个空间 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { perror("Can't allocate thread descriptors"); @@ -901,4 +934,3 @@ void thread_init(int nthreads, struct event_base *main_base) { wait_for_thread_registration(nthreads); pthread_mutex_unlock(&init_lock); } -