Class Skynet::Job
In: lib/skynet/skynet_job.rb
lib/skynet/skynet_tuplespace_server.rb
Parent: Object

Skynet::Job is the main interface to Skynet. You create a job object giving it the starting data (map_data), along with what class has the map/reduce functions in it. Even though Skynet is distributed, when you call run on a plain Skynet::Job, it will still block in your current process until it has completed your task. If you want to go on to do other things you‘ll want to pass :async => true when creating a new job. Then later call job.results to retrieve your results.

There are also many global configuration options which can be controlled through Skynet::CONFIG

Example Usage: Create a file called mapreduce_test.rb with the following.

  class MapreduceTest
    include SkynetDebugger   ## This gives you logging methods such as log, error, info, fatal

    def self.run
      job = Skynet::Job.new(
        :mappers          => 2,
        :reducers         => 1,
        :map_reduce_class => self,
        :map_data         => [OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 3})]
      )
      results = job.run
    end

    def self.map(profiles)
      result = Array.new
      profiles.each do |profile|
        result << [profile.created_by, 1] if profile.created_by
      end
      result
    end

    def self.reduce(pairs)
      totals = Hash.new
      pairs.each do |pair|
        created_by, count = pair[0], pair[1]
        totals[created_by] ||= 0
        totals[created_by] += count
      end
      totals
    end
  end

You need to make sure Skynet is running with your class loaded. That‘s is how Skynet works. Since there is no easy way to actually pass code around the network, each skynet worker must already have your code loaded. If you have skynet started, stop it and then start it with the -r flag to tell it where to find your class it should require.

   $ skynet -r mapreduce_test.rb

Then go into the skynet console to test running your map reduce task.

   $ skynet console -r mapreduce_test.rb
   skynet>> MapreduceTest.run     # returns {2=>2, 3=>1}

In the example above, you might notice that self.map and self.reduce both accept Arrays. If you do not want to deal with getting arrays of map_data or reduce_data, you can include MapreduceHelper into your class and then implement self.map_each and self.reduce_each methods. The included self.map and self.reduce methods will handle iterating over the map_data and reduce_data, passing each element to your map_each and reduce_each methods respectively. They will also handle error handling within that loop to make sure even if a single map or reduce fails, processing will continue. If you do not want processing to continue if a map fails, do not use the MapreduceHelper mixin.

Since Skynet must have your code, you will probably want to install skynet into the application that skynet needs access to in order to run your jobs. See bin/skynet_install for more info.

See new for the many other options to control various Skynet::Job settings.

Methods

Included Modules

SkynetDebugger Skynet::GuidGenerator

Classes and Modules

Class Skynet::Job::BadMapOrReduceError
Class Skynet::Job::Error
Class Skynet::Job::LocalMessageQueue
Class Skynet::Job::WorkerError

Constants

FIELDS = [:queue_id, :mappers, :reducers, :silent, :name, :map_timeout, :map_data, :job_id, :reduce_timeout, :master_timeout, :map_name, :reduce_name, :master_result_timeout, :result_timeout, :start_after, :solo, :single, :version, :map, :map_partitioner, :reduce, :reduce_partition, :map_reduce_class, :master_retry, :map_retry, :reduce_retry, :keep_map_tasks, :keep_reduce_tasks, :local_master, :async, :data_debug

Attributes

data_debug  [RW] 
use_local_queue  [RW] 

Public Class methods

Most of the time you will merely call new(options) and then run on the returned object.

Options are: :local_master BOOL (DEFAULT true)

  By default, your Skynet::Job will act as the master for your map/reduce job, doling out
  tasks, waiting for other workers to complete and return their results and dealing with
  merging and partitioning the data.   If you call #run in async mode, another worker will handle
  being the master for your job without blocking.  If you run :async => false, :local_master => false
  Skynet will let another worker be the master for your job, but will block waiting for the
  final results.  The benefit of this is that if your process dies, the Job will continue to
  run remotely.

:async BOOL (DEFAULT false)

  If you run in async mode, another worker will handle being the master for your job without blocking.
  You can not pass :local_master => true, :async => true since the only way to allow your
  job to run asyncronously is to have a remote_master.

:map_data(Array or Enumerable)

   map_data should be an Array or Enumerable that data Skynet::Job will split up
   and distribute among your workers.   You can stream data to Skynet::Job by passing
   an Enumerable that implements next or each.

:map_reduce_class(Class or Class Name)

  Skynet::Job will look for class methods named self.map, self.reduce, self.map_partitioner,
  self.reduce_partition in your map_reduce_class.  The only method requires is self.map.
  Each of these methods must accept an array.  Examples above.

:map(Class Name)

  You can pass a classname, or a proc.  If you pass a classname, Job will look for a method
  called self.map in that class.
  WARNING: Passing a proc does not work right now.

:reduce(Class Name)

  You can pass a classname, or a proc.  If you pass a classname, Job will look for a method
  called self.reduce in that class.
  WARNING: Passing a proc does not work right now.

:reduce_partition(Class Name)

  You can pass a classname, or a proc.  If you pass a classname, Job will look for a method
  called self.reduce_partition in that class.
  WARNING: Passing a proc does not work right now.

:mappers Fixnum

  The number of mappers to partition map data for.

:reducers Fixnum

  The number of reducers to partition the returned map_data for.

:master_retry Fixnum

  If the master fails for any reason, how many times should it be retried?  You can also set
  Skynet::CONFIG[:DEFAULT_MASTER_RETRY] (DEFAULT 0)

:map_retry Fixnum

  If a map task fails for any reason, how many times should it be retried?  You can also set
  Skynet::CONFIG[:DEFAULT_MAP_RETRY] (DEFAULT 3)

:reduce_retry Fixnum

  If a reduce task fails for any reason, how many times should it be retried?  You can also set
  Skynet::CONFIG[:DEFAULT_REDUCE_RETRY] (DEFAULT 3)

:master_timeout, :map_timeout, :reduce_timeout, master_result_timeout, result_timeout

  These control how long skynet should wait for particular actions to be finished.
  The master_timeout controls how long the master should wait for ALL map/reduce tasks ie. the entire job to finish.
  The master_result_timeout controls how long the final result should wait in the queue before being expired.
  The map and reduce timeouts control how long individual map and reduce tasks shoudl take.

:single BOOL

  By default the master task distributes the map and reduce tasks to other workers.
  In single mode the master will take care of the map and reduce tasks by itself.
  This is handy when you really want to just perform some single action asyncronously.
  In this case you're merely using Skynet to postpone some action. In single mode, the
  first worker that picks up your task will just complete it as opposed to trying to distribute
  it to another worker.

:start_after Time or Time.to_i

  Do not start job until :start_after has passed

:queue String

  Which queue should this Job go in to?  The queue provided is merely used to
  determine the queue_id.
  Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]

:queue_id Fixnum (DEFAULT 0)

  Which queue should this Job go in to?
  Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]

:solo BOOL

  One normally turns solo mode in in Skynet::Config using Skynet::CONFIG[:SOLO] = true
  In solo mode, Skynet jobs do not add items to a Skynet queue. Instead they do all
  work in place.  It's like a Skynet simulation mode.  It will complete all tasks
  without Skynet running.  Great for testing.  You can also wrap code blocks in
  Skynet.solo {} to run that code in solo mode.

:version Fixnum

  If you do not provide a version the current worker version will be used.
  Skynet workers start at a specific version and only look for jobs that match that version.
  A worker will continue looking for jobs at that version until there are no more jobs left on
  the queue for that version.  At that time, the worker will check to see if there is a new version.
  If there is, it will restart itself at the new version (assuming you had already pushed code to
  said workers.)
  To retrieve the current version, set the current version or increment the current version, see
  Skynet::Job.set_worker_version, Skynet::Job.get_worker_version, Skynet::Job.increment_worker_version

:name, :map_name, :reduce_name

  These name methods are merely for debugging while watching the Skynet logs or the Skynet queue.
  If you do not supply names, it will try and provide sensible ones based on your class names.

:keep_map_tasks BOOL or Fixnum (DEFAULT 1)

  If true, the master will run the map_tasks locally.
  If a number is provided, the master will run the map_tasks locally if there are
  LESS THAN OR EQUAL TO the number provided.
  You may also set Skynet::CONFIG[:DEFAILT_KEEP_MAP_TASKS] DEFAULT 1

:keep_reduce_tasks BOOL or Fixnum (DEFAULT 1)

  If true, the master will run the reduce_tasks locally.
  If a number is provided, the master will run the reduce_tasks locally if there are
  LESS THAN OR EQUAL TO the number provided.
  You may also set Skynet::CONFIG[:DEFAILT_REDUCVE_MAP_TASKS] DEFAULT 1

Given a job_id, returns the results from the message queue. Used to retrieve results of asyncronous jobs.

Public Instance methods

async is true if the async flag is set and the job is not a ‘single’ job, or in solo mode. async only applies to whether we run the master locally and whether we poll for the result

Returns the final results of this map/reduce job. If results is called on an :async job calling results will block until results are found or the master_timeout is reached.

Options are: :local_master BOOL (DEFAULT true)

  By default, your Skynet::Job will act as the master for your map/reduce job, doling out
  tasks, waiting for other workers to complete and return their results and dealing with
  merging and partitioning the data.   If you run in async mode, another worker will handle
  being the master for your job without blocking.  If you run :async => false, :local_master => false
  Skynet will let another worker be the master for your job, but will block waiting for the
  final results.  The benefit of this is that if your process dies, the Job will continue to
  run remotely.

:async BOOL (DEFAULT false)

  If you run in async mode, another worker will handle being the master for your job without blocking.
  You can not pass :local_master => true, :async => true since the only way to allow your
  job to run asyncronously is to have a remote_master.

  You can pass any options you might pass to Skynet::Job.new.  Warning: Passing options to run
  will permanently change properties of the job.

[Validate]