利志分享
view_headline
开发工具箱
go教程
clickhouse教程
kafka教程
python教程
shell教程
原创杂文
开发工具箱
go教程
clickhouse教程
kafka教程
python教程
shell教程
原创杂文
深入理解redis的一个del和unlink的命令的执行过程-2
原创杂文 / 时间:2021-10-07 23:11:41 / 阅读:342 / 分享:0
继续上一篇文章继续讲,上次我们讲了del涉及到的同步删除的整个逻辑,del删除会通过参数走到dbSyncDelete方法,然而unlink则会走dbAsyncDelete方法。这里我们直接从dbAsyncDelete这个方法开始讲。 dbAsyncDelete方法的执行是在redis/src/lazyfree.c里面。 ``` /* Delete a key, value, and associated expiration entry if any, from the DB. * If there are enough allocations to free the value object may be put into * a lazy free list instead of being freed synchronously. The lazy free list * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 int dbAsyncDelete(redisDb *db, robj *key) { /* 从db->expires中删除key,只是删除其指针而已,并没有删除实际值 */ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ /* * 在字典中摘除这个key(没有真正删除,只是查不到而已),如果被摘除的dictEntry不为 * 空就去执行下面的释放逻辑 */ dictEntry *de = dictUnlink(db->dict,key->ptr); if (de) { robj *val = dictGetVal(de); /* Tells the module that the key has been unlinked from the database. */ moduleNotifyKeyUnlink(key,val); /* lazy_free并不是完全异步的,而是先评估释放操作所需工作量,如果影响较小就直接在主线程中删除了 */ size_t free_effort = lazyfreeGetFreeEffort(key,val); /* If releasing the object is too much work, do it in the background * by adding the object to the lazy free list. * Note that if the object is shared, to reclaim it now it is not * possible. This rarely happens, however sometimes the implementation * of parts of the Redis core may call incrRefCount() to protect * objects, and then call dbDelete(). In this case we'll fall * through and reach the dictFreeUnlinkedEntry() call, that will be * equivalent to just calling decrRefCount(). * 如果释放这个对象需要做大量的工作,就把他放到异步线程里做 * 但如果这个对象是共享对象(refcount > 1)就不能直接释放了,当然这很少发送,但有可能redis * 核心会调用incrRefCount来保护对象,然后调用dbDelete。这我只需要直接调用dictFreeUnlinkedEntry, * 等价于调用decrRefCount */ if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { // 异步释放+1,原子操作 atomicIncr(lazyfree_objects,1); // 将 value 的释放添加到异步线程队列中去,后台处理, 任务类型为 异步释放内存 bioCreateLazyFreeJob(lazyfreeFreeObject,1, val); 设置val为NULL, 以便在外部进行删除时忽略释放value相关内存 dictSetVal(db->dict,de,NULL); } } /* 释放键值对所占用的内存,如果是lazyFree,val已经是null了,只需要释放key的内存即可 */ if (de) { dictFreeUnlinkedEntry(db->dict,de); if (server.cluster_enabled) slotToKeyDel(key->ptr); return 1; } else { return 0; } } ``` 上面的函数执行之后,就是添加异步任务到线程中,执行bioCreateLazyFreeJob函数,再执行bioSubmitJob,方法在redis/src/bio.c文件中 ``` void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { va_list valist; /* Allocate memory for the job structure and all required * arguments */ struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count)); job->free_fn = free_fn; va_start(valist, arg_count); for (int i = 0; i < arg_count; i++) { job->free_args[i] = va_arg(valist, void *); } va_end(valist); bioSubmitJob(BIO_LAZY_FREE, job); } void bioSubmitJob(int type, struct bio_job *job) { job->time = time(NULL); // 多线程需要加锁,把待处理的job添加到队列末尾 pthread_mutex_lock(&bio_mutex[type]); listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; // 唤醒任务线程 pthread_cond_signal(&bio_newjob_cond[type]); pthread_mutex_unlock(&bio_mutex[type]); } ``` 后台线程任务框架调用的方法是bioProcessBackgroundJobs,线程根据不同的类型的后台线程执行相关操作。方法位于redis/src/bio.c。 ``` /* 根据参数创建不同的后台线程 */ void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; sigset_t sigset; /* Check that the type is within the right interval. */ if (type >= BIO_NUM_OPS) { serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type); return NULL; } switch (type) { case BIO_CLOSE_FILE: redis_set_thread_title("bio_close_file"); break; case BIO_AOF_FSYNC: redis_set_thread_title("bio_aof_fsync"); break; case BIO_LAZY_FREE: redis_set_thread_title("bio_lazy_free"); break; } redisSetCpuAffinity(server.bio_cpulist); makeThreadKillable(); pthread_mutex_lock(&bio_mutex[type]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ if (listLength(bio_jobs[type]) == 0) { // 注意此处将会释放锁哟,以便外部可以添加任务进来 pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ ln = listFirst(bio_jobs[type]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex[type]); /* 根据任务类型执行不同的逻辑 */ if (type == BIO_CLOSE_FILE) { close(job->fd); } else if (type == BIO_AOF_FSYNC) { redis_fsync(job->fd); } else if (type == BIO_LAZY_FREE) { job->free_fn(job->free_args); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[type]); listDelNode(bio_jobs[type],ln); bio_pending[type]--; /* Unblock threads blocked on bioWaitStepOfType() if any. */ pthread_cond_broadcast(&bio_step_cond[type]); } } ``` 关于unlink的异步操作主要涉及到这几个函数的执行,这里unlink操作删除的时候会先评估释放数据的工作量,如果较小就会直接主线程做删除操作。 这里分享的源码希望对大家有帮助,如果有兴趣进一步了解,可以私我要一下整个redis翻译后的代码。
按时间分类
2022-04-22
2022-04-10
2022-03-26
2022-03-16
2022-03-08
2022-03-04
2022-02-27
2022-02-26
2022-02-19
2022-01-30
2022-01-23
2021-11-14
2021-11-06
2021-10-07
2021-09-21
2021-09-20
2021-09-15
2021-08-22
2021-08-13
2021-07-28
2021-06-17
2021-05-22
2021-04-23
2021-04-03
2021-03-07
2021-03-05
2021-03-04
2021-02-28
2021-02-21
2021-02-20