| Class | Skynet::Job |
| In: |
lib/skynet/skynet_job.rb
lib/skynet/skynet_tuplespace_server.rb |
| Parent: | Object |
Skynet::Job is the main interface to Skynet. You create a job object giving it the starting data (map_data), along with what class has the map/reduce functions in it. Even though Skynet is distributed, when you call run on a plain Skynet::Job, it will still block in your current process until it has completed your task. If you want to go on to do other things you‘ll want to pass :async => true when creating a new job. Then later call job.results to retrieve your results.
There are also many global configuration options which can be controlled through Skynet::CONFIG
Example Usage: Create a file called mapreduce_test.rb with the following.
class MapreduceTest
include SkynetDebugger ## This gives you logging methods such as log, error, info, fatal
def self.run
job = Skynet::Job.new(
:mappers => 2,
:reducers => 1,
:map_reduce_class => self,
:map_data => [OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 3})]
)
results = job.run
end
def self.map(profiles)
result = Array.new
profiles.each do |profile|
result << [profile.created_by, 1] if profile.created_by
end
result
end
def self.reduce(pairs)
totals = Hash.new
pairs.each do |pair|
created_by, count = pair[0], pair[1]
totals[created_by] ||= 0
totals[created_by] += count
end
totals
end
end
You need to make sure Skynet is running with your class loaded. That‘s is how Skynet works. Since there is no easy way to actually pass code around the network, each skynet worker must already have your code loaded. If you have skynet started, stop it and then start it with the -r flag to tell it where to find your class it should require.
$ skynet -r mapreduce_test.rb
Then go into the skynet console to test running your map reduce task.
$ skynet console -r mapreduce_test.rb
skynet>> MapreduceTest.run # returns {2=>2, 3=>1}
In the example above, you might notice that self.map and self.reduce both accept Arrays. If you do not want to deal with getting arrays of map_data or reduce_data, you can include MapreduceHelper into your class and then implement self.map_each and self.reduce_each methods. The included self.map and self.reduce methods will handle iterating over the map_data and reduce_data, passing each element to your map_each and reduce_each methods respectively. They will also handle error handling within that loop to make sure even if a single map or reduce fails, processing will continue. If you do not want processing to continue if a map fails, do not use the MapreduceHelper mixin.
Since Skynet must have your code, you will probably want to install skynet into the application that skynet needs access to in order to run your jobs. See bin/skynet_install for more info.
See new for the many other options to control various Skynet::Job settings.
| FIELDS | = | [:queue_id, :mappers, :reducers, :silent, :name, :map_timeout, :map_data, :job_id, :reduce_timeout, :master_timeout, :map_name, :reduce_name, :master_result_timeout, :result_timeout, :start_after, :solo, :single, :version, :map, :map_partitioner, :reduce, :reduce_partition, :map_reduce_class, :master_retry, :map_retry, :reduce_retry, :keep_map_tasks, :keep_reduce_tasks, :local_master, :async, :data_debug |
| data_debug | [RW] | |
| use_local_queue | [RW] |
Most of the time you will merely call new(options) and then run on the returned object.
Options are: :local_master BOOL (DEFAULT true)
By default, your Skynet::Job will act as the master for your map/reduce job, doling out tasks, waiting for other workers to complete and return their results and dealing with merging and partitioning the data. If you call #run in async mode, another worker will handle being the master for your job without blocking. If you run :async => false, :local_master => false Skynet will let another worker be the master for your job, but will block waiting for the final results. The benefit of this is that if your process dies, the Job will continue to run remotely.
:async BOOL (DEFAULT false)
If you run in async mode, another worker will handle being the master for your job without blocking. You can not pass :local_master => true, :async => true since the only way to allow your job to run asyncronously is to have a remote_master.
:map_data(Array or Enumerable)
map_data should be an Array or Enumerable that data Skynet::Job will split up and distribute among your workers. You can stream data to Skynet::Job by passing an Enumerable that implements next or each.
:map_reduce_class(Class or Class Name)
Skynet::Job will look for class methods named self.map, self.reduce, self.map_partitioner, self.reduce_partition in your map_reduce_class. The only method requires is self.map. Each of these methods must accept an array. Examples above.
:map(Class Name)
You can pass a classname, or a proc. If you pass a classname, Job will look for a method called self.map in that class. WARNING: Passing a proc does not work right now.
:reduce(Class Name)
You can pass a classname, or a proc. If you pass a classname, Job will look for a method called self.reduce in that class. WARNING: Passing a proc does not work right now.
:reduce_partition(Class Name)
You can pass a classname, or a proc. If you pass a classname, Job will look for a method called self.reduce_partition in that class. WARNING: Passing a proc does not work right now.
:mappers Fixnum
The number of mappers to partition map data for.
:reducers Fixnum
The number of reducers to partition the returned map_data for.
:master_retry Fixnum
If the master fails for any reason, how many times should it be retried? You can also set Skynet::CONFIG[:DEFAULT_MASTER_RETRY] (DEFAULT 0)
:map_retry Fixnum
If a map task fails for any reason, how many times should it be retried? You can also set Skynet::CONFIG[:DEFAULT_MAP_RETRY] (DEFAULT 3)
:reduce_retry Fixnum
If a reduce task fails for any reason, how many times should it be retried? You can also set Skynet::CONFIG[:DEFAULT_REDUCE_RETRY] (DEFAULT 3)
:master_timeout, :map_timeout, :reduce_timeout, master_result_timeout, result_timeout
These control how long skynet should wait for particular actions to be finished. The master_timeout controls how long the master should wait for ALL map/reduce tasks ie. the entire job to finish. The master_result_timeout controls how long the final result should wait in the queue before being expired. The map and reduce timeouts control how long individual map and reduce tasks shoudl take.
:single BOOL
By default the master task distributes the map and reduce tasks to other workers. In single mode the master will take care of the map and reduce tasks by itself. This is handy when you really want to just perform some single action asyncronously. In this case you're merely using Skynet to postpone some action. In single mode, the first worker that picks up your task will just complete it as opposed to trying to distribute it to another worker.
:start_after Time or Time.to_i
Do not start job until :start_after has passed
:queue String
Which queue should this Job go in to? The queue provided is merely used to determine the queue_id. Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]
:queue_id Fixnum (DEFAULT 0)
Which queue should this Job go in to? Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]
:solo BOOL
One normally turns solo mode in in Skynet::Config using Skynet::CONFIG[:SOLO] = true
In solo mode, Skynet jobs do not add items to a Skynet queue. Instead they do all
work in place. It's like a Skynet simulation mode. It will complete all tasks
without Skynet running. Great for testing. You can also wrap code blocks in
Skynet.solo {} to run that code in solo mode.
:version Fixnum
If you do not provide a version the current worker version will be used. Skynet workers start at a specific version and only look for jobs that match that version. A worker will continue looking for jobs at that version until there are no more jobs left on the queue for that version. At that time, the worker will check to see if there is a new version. If there is, it will restart itself at the new version (assuming you had already pushed code to said workers.) To retrieve the current version, set the current version or increment the current version, see Skynet::Job.set_worker_version, Skynet::Job.get_worker_version, Skynet::Job.increment_worker_version
:name, :map_name, :reduce_name
These name methods are merely for debugging while watching the Skynet logs or the Skynet queue. If you do not supply names, it will try and provide sensible ones based on your class names.
:keep_map_tasks BOOL or Fixnum (DEFAULT 1)
If true, the master will run the map_tasks locally. If a number is provided, the master will run the map_tasks locally if there are LESS THAN OR EQUAL TO the number provided. You may also set Skynet::CONFIG[:DEFAILT_KEEP_MAP_TASKS] DEFAULT 1
:keep_reduce_tasks BOOL or Fixnum (DEFAULT 1)
If true, the master will run the reduce_tasks locally. If a number is provided, the master will run the reduce_tasks locally if there are LESS THAN OR EQUAL TO the number provided. You may also set Skynet::CONFIG[:DEFAILT_REDUCVE_MAP_TASKS] DEFAULT 1
async is true if the async flag is set and the job is not a ‘single’ job, or in solo mode. async only applies to whether we run the master locally and whether we poll for the result
Options are: :local_master BOOL (DEFAULT true)
By default, your Skynet::Job will act as the master for your map/reduce job, doling out tasks, waiting for other workers to complete and return their results and dealing with merging and partitioning the data. If you run in async mode, another worker will handle being the master for your job without blocking. If you run :async => false, :local_master => false Skynet will let another worker be the master for your job, but will block waiting for the final results. The benefit of this is that if your process dies, the Job will continue to run remotely.
:async BOOL (DEFAULT false)
If you run in async mode, another worker will handle being the master for your job without blocking. You can not pass :local_master => true, :async => true since the only way to allow your job to run asyncronously is to have a remote_master. You can pass any options you might pass to Skynet::Job.new. Warning: Passing options to run will permanently change properties of the job.