Start, Stop, and monitor distributed parallel sorting (dSort)

View as Markdown

For background and in-depth presentation, please see this document.

Usage

ais dsort [SRC_BUCKET] [DST_BUCKET] —spec [JSON_SPECIFICATION|YAML_SPECIFICATION|-] [command options]

1$ ais dsort --help
2NAME:
3 ais dsort - (alias for "job start dsort") start dsort job
4 Required parameters:
5 - input_bck: source bucket (used as both source and destination if the latter not specified)
6 - input_format: (see docs and examples below)
7 - output_format: (ditto)
8 - output_shard_size: (as the name implies)
9 E.g. inline JSON spec:
10 $ ais start dsort '{
11 "extension": ".tar",
12 "input_bck": {"name": "dsort-testing"},
13 "input_format": {"template": "shard-{0..9}"},
14 "output_shard_size": "200KB",
15 "description": "pack records into categorized shards",
16 "ekm_file": "http://website.web/static/ekm_file.txt",
17 "ekm_file_sep": " "
18 }'
19 E.g. inline YAML spec:
20 $ ais start dsort -f - <<EOM
21 extension: .tar
22 input_bck:
23 name: dsort-testing
24 input_format:
25 template: shard-{0..9}
26 output_format: new-shard-{0000..1000}
27 output_shard_size: 10KB
28 description: shuffle shards from 0 to 9
29 algorithm:
30 kind: shuffle
31 EOM
32 Tip: use '--dry-run' to see the results without making any changes
33 Tip: use '--verbose' to print the spec (with all its parameters including applied defaults)
34 See also: docs/dsort.md, docs/cli/dsort.md, and ais/test/scripts/dsort*
35
36USAGE:
37 ais dsort [SRC_BUCKET] [DST_BUCKET] --spec [JSON_SPECIFICATION|YAML_SPECIFICATION|-] [command options]
38
39OPTIONS:
40 --spec value, -f value path to JSON or YAML request specification
41 --verbose, -v verbose
42 --help, -h show help

Example

This example simply runs ais/test/scripts/dsort-ex1-spec.json specification. The source and destination buckets - ais://src and ais://dst, respectively - must exist.

Further, the source buckets must have at least 10 shards with names that match input_format (see below).

Notice the -v (--verbose) switch as well.

1$ ais start dsort ais://src ais://dst -f ais/test/scripts/dsort-ex1-spec.json --verbose
2PROPERTY VALUE
3algorithm.content_key_type -
4algorithm.decreasing false
5algorithm.extension -
6algorithm.kind alphanumeric
7algorithm.seed -
8create_concurrency_max_limit 0
9description sort shards alphanumerically
10dry_run false
11dsorter_type -
12extension .tar
13extract_concurrency_max_limit 0
14input_bck ais://src
15input_format.objnames -
16input_format.template shard-{0..9}
17max_mem_usage -
18ekm_file -
19ekm_file_sep \t
20output_bck ais://dst
21output_format new-shard-{0000..1000}
22output_shard_size 10KB
23
24Config override: none
25
26srt-M8ld-VU_i

Generate Shards

ais archive gen-shards "BUCKET/TEMPLATE.EXT"

Put randomly generated shards into a bucket. The main use case for this command is dSort testing. Further reference for this command can be found here.

Start dSort job

ais start dsort --spec JOB_SPEC or ais start dsort -f <PATH_TO_JOB_SPEC>

Start new dSort job with the provided specification. Specification should be provided by either argument or -f flag - providing both argument and flag will result in error. Upon creation, JOB_ID of the job is returned - it can then be used to abort it or retrieve metrics.

FlagTypeDescriptionDefault
--spec, -fstringPath to JSON or YAML specification. Providing - will result in reading from STDIN""

The following table describes JSON/YAML keys which can be used in the specification.

KeyTypeDescriptionRequiredDefault
extensionstringextension of input and output shards (either .tar, .tgz or .zip)yes
input_format.templatestringname template for input shardyes
output_formatstringname template for output shardyes
input_bck.namestringbucket name where shards objects are storedyes
input_bck.providerstringbucket backend provider, see docsno"ais"
output_bck.namestringbucket name where new output shards will be savednosame as input_bck.name
output_bck.providerstringbucket backend provider, see docsnosame as input_bck.provider
descriptionstringdescription of dSort jobno""
output_shard_sizestringsize (in bytes) of the output shard, can be in form of raw numbers 10240 or suffixed 10KByes
algorithm.kindstringdetermines which sorting algorithm dSort job uses, available are: "alphanumeric", "shuffle", "content"no"alphanumeric"
algorithm.decreasingbooldetermines if the algorithm should sort the records in decreasing or increasing order, used for kind=alphanumeric or kind=contentnofalse
algorithm.seedstringseed provided to random generator, used when kind=shuffleno"" - time.Now() is used
algorithm.extensionstringcontent of the file with provided extension will be used as sorting key, used when kind=contentyes (only when kind=content)
algorithm.content_key_typestringcontent key type; may have one of the following values: “int”, “float”, or “string”; used exclusively with kind=content sortingyes (only when kind=content)
ekm_filestringURL to the file containing external key map (it should contain lines in format: record_key[sep]shard-%d-fmt)yes (only when output_format not provided)""
ekm_file_sepstringseparator used for splitting record_key and shard-%d-fmt in the lines in external key mapno\t (TAB)
max_mem_usagestringlimits the amount of total system memory allocated by both dSort and other running processes. Once and if this threshold is crossed, dSort will continue extracting onto local drives. Can be in format 60% or 10GBnosame as in /deploy/dev/local/aisnode_config.sh
extract_concurrency_max_limitintlimits maximum number of concurrent shards extracted per diskno(calculated based on different factors) ~50
create_concurrency_max_limitintlimits maximum number of concurrent shards created per diskno(calculated based on different factors) ~50

There’s also the possibility to override some of the values from global distributed_sort config via job specification. All values are optional - if empty, the value from global distributed_sort config will be used. For more information refer to configuration.

KeyTypeDescription
duplicated_recordsstringwhat to do when duplicated records are found: “ignore” - ignore and continue, “warn” - notify a user and continue, “abort” - abort dSort operation
missing_shardsstringwhat to do when missing shards are detected: “ignore” - ignore and continue, “warn” - notify a user and continue, “abort” - abort dSort operation
ekm_malformed_linestringwhat to do when extraction key map notices a malformed line: “ignore” - ignore and continue, “warn” - notify a user and continue, “abort” - abort dSort operation
ekm_missing_keystringwhat to do when extraction key map have a missing key: “ignore” - ignore and continue, “warn” - notify a user and continue, “abort” - abort dSort operation
dsorter_mem_thresholdstringminimum free memory threshold which will activate specialized dsorter type which uses memory in creation phase - benchmarks shows that this type of dsorter behaves better than general type

Examples

Sort records inside the shards

Command defined below starts (alphanumeric) sorting job with extended metrics for input shards with names shard-0.tar, shard-1.tar, …, shard-9.tar. Each of the output shards will have at least 10240 bytes (10KB) and will be named new-shard-0000.tar, new-shard-0001.tar, …

Assuming that dsort_spec.json contains:

1{
2 "extension": ".tar",
3 "input_bck": {"name": "dsort-testing"},
4 "input_format": {
5 "template": "shard-{0..9}"
6 },
7 "output_format": "new-shard-{0000..1000}",
8 "output_shard_size": "10KB",
9 "description": "sort shards from 0 to 9",
10 "algorithm": {
11 "kind": "alphanumeric"
12 },
13}

You can start dSort job with:

1$ ais start dsort -f dsort_spec.json
2JGHEoo89gg

Shuffle records

Command defined below starts basic shuffle job for input shards with names shard-0.tar, shard-1.tar, …, shard-9.tar. Each of the output shards will have at least 10240 bytes (10KB) and will be named new-shard-0000.tar, new-shard-0001.tar, …

1$ ais start dsort -f - <<EOM
2extension: .tar
3input_bck:
4 name: dsort-testing
5input_format:
6 template: shard-{0..9}
7output_format: new-shard-{0000..1000}
8output_shard_size: 10KB
9description: shuffle shards from 0 to 9
10algorithm:
11 kind: shuffle
12EOM
13JGHEoo89gg

Pack records into shards with different categories - EKM (External Key Map)

One of the key features of the dSort is that user can specify the exact mapping from the record key to the output shard. To use this feature output_format should be empty and ekm_file, as well as ekm_file_sep, must be set. The output shards will be created with provided template format.

Assuming that ekm_file (URL: http://website.web/static/ekm_file.txt) has content:

cat_0.txt shard-cats-%d
cat_1.txt shard-cats-%d
...
dog_0.txt shard-dogs-%d
dog_1.txt shard-dogs-%d
...
car_0.txt shard-car-%d
car_1.txt shard-car-%d
...

or if ekm_file (URL: http://website.web/static/ekm_file.json, notice .json extension) and has content:

1{
2 "shard-cats-%d": [
3 "cat_0.txt",
4 "cat_1.txt",
5 ...
6 ],
7 "shard-dogs-%d": [
8 "dog_0.txt",
9 "dog_1.txt",
10 ...
11 ],
12 "shard-car-%d": [
13 "car_0.txt",
14 "car_1.txt",
15 ...
16 ],
17 ...
18}

or, you can also use regex as the record identifier. The ekm_file can contain regex patterns as keys to match multiple records that fit the regex pattern to provided format.

1{
2 "shard-cats-%d": [
3 "cat_[0-9]+\\.txt"
4 ],
5 "shard-dogs-%d": [
6 "dog_[0-9]+\\.txt"
7 ],
8 "shard-car-%d": [
9 "car_[0-9]+\\.txt"
10 ],
11 ...
12}

and content of the input shards looks more or less like this:

shard-0.tar:
- cat_0.txt
- dog_0.txt
- car_0.txt
...
shard-1.tar:
- cat_1.txt
- dog_1.txt
- car_1.txt
...

You can run:

1$ ais start dsort --spec '{
2 "extension": ".tar",
3 "input_bck": {"name": "dsort-testing"},
4 "input_format": {"template": "shard-{0..9}"},
5 "output_shard_size": "200KB",
6 "description": "pack records into categorized shards",
7 "ekm_file": "http://website.web/static/ekm_file.txt",
8 "ekm_file_sep": " "
9}'
10JGHEoo89gg

After the run, the output shards will look more or less like this (the number of records in given shard depends on provided output_shard_size):

shard-cats-0.tar:
- cat_1.txt
- cat_2.txt
shard-cats-1.tar:
- cat_3.txt
- cat_4.txt
...
shard-dogs-0.tar:
- dog_1.txt
- dog_2.txt
...

EKM also supports template syntax to express output shard names. For example, if ekm_file has content:

1{
2 "shard-{0..100..3}-cats": [
3 "cat_0.txt",
4 "cat_1.txt",
5 "cat_3.txt",
6 "cat_4.txt",
7 "cat_5.txt",
8 "cat_6.txt",
9 ...
10 ],
11 "shard-@00001-gap-@100-dogs": [
12 "dog_0.txt",
13 "dog_1.txt",
14 ...
15 ],
16 "shard-%06d-cars": [
17 "car_0.txt",
18 "car_1.txt",
19 ...
20 ],
21 ...
22}

After running dsort, the output would be look like this:

shard-0-cats.tar:
- cat_0.txt
- cat_1.txt
shard-3-cats.tar:
- cat_2.txt
- cat_3.txt
shard-6-cats.tar:
- cat_4.txt
- cat_5.txt
...
shard-00001-gap-001-dogs.tar:
- dog_0.txt
- dog_1.txt
shard-00001-gap-002-dogs.tar:
- dog_2.txt
- dog_3.txt
...
shard-1-cars.tar:
- car_0.txt
- car_1.txt
shard-2-cars.tar:
- car_2.txt
- car_3.txt
...

Show dSort jobs and job status

ais show job dsort [JOB_ID]

Retrieve the status of the dSort with provided JOB_ID which is returned upon creation. Lists all dSort jobs if the JOB_ID argument is omitted.

Options

1$ ais show job dsort --help
2
3NAME:
4 ais show job - Show running and/or finished jobs,
5 e.g.:
6 - show job tco-cysbohAGL - show a given (multi-object copy/transform) job identified by its unique ID;
7 - show job copy-listrange - show all running multi-object copies;
8 - show job copy-objects - same as above (using display name);
9 - show job copy - show all copying jobs including both bucket-to-bucket and multi-object;
10 - show job copy-objects --all - show both running and already finished (or stopped) multi-object copies;
11 - show job list - show all running list-objects jobs;
12 - show job ls - same as above;
13 - show job ls --refresh 10 - same as above with periodic _refreshing_ every 10 seconds;
14 - show job ls --refresh 10 --count 4 - same as above but only for the first four 10-seconds intervals;
15 - show job prefetch-listrange - show all running prefetch jobs;
16 - show job prefetch - same as above;
17 - show job prefetch --refresh 1m - show all running prefetch jobs at 1 minute intervals (until Ctrl-C);
18 - show job evict - all running bucket and/or data evicting jobs;
19 - show job --all - show absolutely all jobs, running and finished.
20
21USAGE:
22 ais show job [NAME] [JOB_ID] [NODE_ID] [BUCKET] [command options]
23
24OPTIONS:
25 --all Include all jobs: running, finished, and aborted
26 --count value Used together with '--refresh' to limit the number of generated reports, e.g.:
27 '--refresh 10 --count 5' - run 5 times with 10s interval (default: 0)
28 --date-time Override the default hh:mm:ss (hours, minutes, seconds) time format - include calendar date as well
29 --json, -j JSON input/output
30 --log value Filename to log metrics (statistics)
31 --no-headers, -H Display tables without headers
32 --progress Show progress bar(s) and progress of execution in real time
33 --refresh value Time interval for continuous monitoring; can be also used to update progress bar (at a given interval);
34 valid time units: ns, us (or µs), ms, s (default), m, h
35 --regex value Regular expression to select jobs by name, kind, or description, e.g.: --regex "ec|mirror|elect"
36 --units value Show statistics and/or parse command-line specified sizes using one of the following units of measurement:
37 iec - IEC format, e.g.: KiB, MiB, GiB (default)
38 si - SI (metric) format, e.g.: KB, MB, GB
39 raw - do not convert to (or from) human-readable format
40 --verbose, -v Show extended statistics
41 --help, -h Show help

Examples

Show dSort jobs with description matching provided regex

Shows all dSort jobs with descriptions starting with sort prefix.

1$ ais show job dsort --regex "^sort (.*)"
2JOB ID STATUS START FINISH DESCRIPTION
3nro_Y5h9n Finished 03-16 11:39:07 03-16 11:39:07 sort shards from 0 to 9
4Key_Y5h9n Finished 03-16 11:39:23 03-16 11:39:23 sort shards from 10 to 19
5enq9Y5Aqn Finished 03-16 11:39:34 03-16 11:39:34 sort shards from 20 to 29

Save metrics to log file

Save newly fetched metrics of the dSort job with ID 5JjIuGemR to /tmp/dsort_run.txt file every 500 milliseconds

1$ ais show job dsort 5JjIuGemR --refresh 500ms --log "/tmp/dsort_run.txt"
2Dsort job has finished successfully in 21.948806ms:
3 Longest extraction: 1.49907ms
4 Longest sorting: 8.288299ms
5 Longest creation: 4.553µs

Show only json metrics

1$ ais show job dsort 5JjIuGemR --json
2{
3 "825090t8089": {
4 "local_extraction": {
5 "started_time": "2020-05-28T09:53:42.466267891-04:00",
6 "end_time": "2020-05-28T09:53:42.50773835-04:00",
7 ....
8 },
9 ....
10 },
11 ....
12}

Show only json metrics filtered by daemon id

1$ ais show job dsort 5JjIuGemR 766516t8087 --json
2{
3 "766516t8087": {
4 "local_extraction": {
5 "started_time": "2020-05-28T09:53:42.466267891-04:00",
6 "end_time": "2020-05-28T09:53:42.50773835-04:00",
7 ....
8 },
9 ....
10 }
11}

Using jq to filter out the json formatted metric output

Show running status of meta sorting phase for all targets.

1$ ais show job dsort 5JjIuGemR --json | jq .[].meta_sorting.running
2false
3false
4true
5false

Show created shards in each target along with the target ids.

1$ ais show job dsort 5JjIuGemR --json | jq 'to_entries[] | [.key, .value.shard_creation.created_count]'
2[
3 "766516t8087",
4 "189"
5]
6[
7 "710650t8086",
8 "207"
9]
10[
11 "825090t8089",
12 "211"
13]
14[
15 "743838t8088",
16 "186"
17]
18[
19 "354275t8085",
20 "207"
21]

Stop dSort job

ais stop dsort JOB_ID

Stop the dSort job with given JOB_ID.

Remove dSort job

ais job rm dsort JOB_ID

Remove the finished dSort job with given JOB_ID from the job list.

Wait for dSort job

ais wait dsort JOB_ID

or, same:

ais wait JOB_ID

Wait for the dSort job with given JOB_ID to finish.

Options

1$ ais wait --help
2
3NAME:
4 ais wait - (alias for "job wait") wait for a specific batch job to complete (press <TAB-TAB> to select, '--help' for more options)
5
6USAGE:
7 ais wait [NAME] [JOB_ID] [NODE_ID] [BUCKET] [command options]
8
9OPTIONS:
10 --progress Show progress bar(s) and progress of execution in real time
11 --refresh value Time interval for continuous monitoring; can be also used to update progress bar (at a given interval);
12 valid time units: ns, us (or µs), ms, s (default), m, h
13 --timeout value Maximum time to wait for a job to finish; if omitted: wait forever or until Ctrl-C;
14 valid time units: ns, us (or µs), ms, s (default), m, h
15 --help, -h Show help