![]() We'll need to store all information for a chain in one place. This isn't an SQL database, so we can't find the next job by querying the next_job_id field. They probably have to be put somewhere else. These must be executed after a specific time, so we can't simply push them to the queue's ready list, because they're not ready yet. scheduled jobs and failed jobs which need to be retried.In fact, we have two big pieces of missing functionality: But what about jobs which failed but need to be retried at a specific time.? We can't simply push them back onto the queues, because they won't be ready until a certain time. For failed jobs which are not to be retried, we can push to a "dead" list. For successful jobs, we can simply discard them, or if we wish, push them to a "successful" list. The other part is saving the job data when we're done. Theoretically doable, but more complicated, highly inefficient and not atomic, so expect to run into concurrency problems. If we put all jobs in one list, the worker would have to load each job in the list, check its queue to determine whether it can process it, and push it back to the list or delete it from the list. This also means we don't need the reserved_by field anymore, since the worker pops the job from the list-there's no chance another worker might try to pick it up. This way, our worker can pick a queue and run LPOP to get the next job on that queue. You'll notice that, in our main thread, we don't push a new job to the queue if it's already at the concurrency limit ( if ). Our threads use the shift method to wait for new jobs. ![]() Calling queue.shift will either return the next item from the queue or, if the queue is empty, block (pause execution of the thread) until a new item is pushed to the queue.įirst up, we start our threads and create the queue they will consume jobs from. It's perfect for us because our main thread can push jobs to it, while child threads listen for new jobs on it, with no need to poll. The good news is that Ruby comes with this shared storage we're looking for: Thread::Queue. When they're not busy with another job, the threads have to continually check this shared storage for new jobs to execute.This means some sort of shared storage that the main thread can write to and child threads can read from. ![]()
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |