{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Using Tensorflow DALI plugin: using various readers\n", "\n", "### Overview\n", "\n", "This example shows how different readers could be used to interact with Tensorflow. It shows how flexible DALI is.\n", "\n", "The following readers are used in this example:\n", "\n", "- MXNetReader\n", "- CaffeReader\n", "- FileReader\n", "- TFRecordReader\n", "\n", "For details on how to use them please see other [examples](..)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us start with defining some global constants" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# MXNet RecordIO\n", "db_folder = \"/data/imagenet/train-480-val-256-recordio/\"\n", "\n", "# Caffe LMDB\n", "lmdb_folder = \"/data/imagenet/train-lmdb-256x256\"\n", "\n", "# image dir with plain jpeg files\n", "image_dir = \"../images\"\n", "\n", "# TFRecord\n", "tfrecord = \"/data/imagenet/train-val-tfrecord-480/train-00001-of-01024\"\n", "tfrecord_idx = \"idx_files/train-00001-of-01024.idx\"\n", "tfrecord2idx_script = \"tfrecord2idx\"\n", "\n", "N = 8 # number of GPUs\n", "BATCH_SIZE = 128 # batch size per GPU\n", "ITERATIONS = 32\n", "IMAGE_SIZE = 3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create idx file by calling `tfrecord2idx` script" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from subprocess import call\n", "import os.path\n", "\n", "if not os.path.exists(\"idx_files\"):\n", " os.mkdir(\"idx_files\")\n", "\n", "if not os.path.isfile(tfrecord_idx):\n", " call([tfrecord2idx_script, tfrecord, tfrecord_idx])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us define:\n", "- common part of pipeline, other pipelines will inherit it" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from nvidia.dali.pipeline import Pipeline\n", "import nvidia.dali.ops as ops\n", "import nvidia.dali.types as types\n", "\n", "class CommonPipeline(Pipeline):\n", " def __init__(self, batch_size, num_threads, device_id):\n", " super(CommonPipeline, self).__init__(batch_size, num_threads, device_id)\n", "\n", " self.decode = ops.nvJPEGDecoder(device = \"mixed\", output_type = types.RGB)\n", " self.resize = ops.Resize(device = \"gpu\",\n", " image_type = types.RGB,\n", " interp_type = types.INTERP_LINEAR)\n", " self.cmn = ops.CropMirrorNormalize(device = \"gpu\",\n", " output_dtype = types.FLOAT,\n", " crop = (227, 227),\n", " image_type = types.RGB,\n", " mean = [128., 128., 128.],\n", " std = [1., 1., 1.])\n", " self.uniform = ops.Uniform(range = (0.0, 1.0))\n", " self.resize_rng = ops.Uniform(range = (256, 480))\n", "\n", " def base_define_graph(self, inputs, labels):\n", " images = self.decode(inputs)\n", " images = self.resize(images, resize_shorter = self.resize_rng())\n", " output = self.cmn(images, crop_pos_x = self.uniform(),\n", " crop_pos_y = self.uniform())\n", " return (output, labels.gpu())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- MXNetReaderPipeline" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from nvidia.dali.pipeline import Pipeline\n", "import nvidia.dali.ops as ops\n", "import nvidia.dali.types as types\n", "\n", "class MXNetReaderPipeline(CommonPipeline):\n", " def __init__(self, batch_size, num_threads, device_id, num_gpus):\n", " super(MXNetReaderPipeline, self).__init__(batch_size, num_threads, device_id)\n", " self.input = ops.MXNetReader(path = [db_folder+\"train.rec\"], index_path=[db_folder+\"train.idx\"],\n", " random_shuffle = True, shard_id = device_id, num_shards = num_gpus)\n", "\n", " def define_graph(self):\n", " images, labels = self.input(name=\"Reader\")\n", " return self.base_define_graph(images, labels)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- CaffeReadPipeline" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "class CaffeReadPipeline(CommonPipeline):\n", " def __init__(self, batch_size, num_threads, device_id, num_gpus):\n", " super(CaffeReadPipeline, self).__init__(batch_size, num_threads, device_id)\n", " self.input = ops.CaffeReader(path = lmdb_folder,\n", " random_shuffle = True, shard_id = device_id, num_shards = num_gpus)\n", "\n", " def define_graph(self):\n", " images, labels = self.input()\n", " return self.base_define_graph(images, labels)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- FileReadPipeline" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "class FileReadPipeline(CommonPipeline):\n", " def __init__(self, batch_size, num_threads, device_id, num_gpus):\n", " super(FileReadPipeline, self).__init__(batch_size, num_threads, device_id)\n", " self.input = ops.FileReader(file_root = image_dir)\n", "\n", " def define_graph(self):\n", " images, labels = self.input()\n", " return self.base_define_graph(images, labels)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- TFRecordPipeline" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "import nvidia.dali.tfrecord as tfrec\n", "\n", "class TFRecordPipeline(CommonPipeline):\n", " def __init__(self, batch_size, num_threads, device_id, num_gpus):\n", " super(TFRecordPipeline, self).__init__(batch_size, num_threads, device_id)\n", " self.input = ops.TFRecordReader(path = tfrecord, \n", " index_path = tfrecord_idx,\n", " features = {\"image/encoded\" : tfrec.FixedLenFeature((), tfrec.string, \"\"),\n", " \"image/class/label\": tfrec.FixedLenFeature([1], tfrec.int64, -1)\n", " })\n", "\n", " def define_graph(self):\n", " inputs = self.input()\n", " images = inputs[\"image/encoded\"]\n", " labels = inputs[\"image/class/label\"]\n", " return self.base_define_graph(images, labels)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let us create function which builds pipeline on demand:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import tensorflow as tf\n", "import nvidia.dali.plugin.tf as dali_tf\n", "\n", "def get_batch_test_dali(batch_size, pipe_type):\n", " pipe_name, label_type, _ = pipe_type\n", " pipes = [pipe_name(batch_size=batch_size, num_threads=2, device_id = device_id, num_gpus = N) for device_id in range(N)]\n", "\n", " daliop = dali_tf.DALIIterator()\n", " images = []\n", " labels = []\n", " for d in range(N):\n", " with tf.device('/gpu:%i' % d):\n", " image, label = daliop(pipeline = pipes[d],\n", " shapes = [(BATCH_SIZE, 3, 227, 227), ()],\n", " dtypes = [tf.int32, label_type],\n", " device_id = d)\n", " images.append(image)\n", " labels.append(label)\n", "\n", " return [images, labels]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At the end let us test if all pipelines have been correctly built and run with TF session" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "RUN: MXNetReaderPipeline\n", "OK : MXNetReaderPipeline\n", "RUN: CaffeReadPipeline\n", "OK : CaffeReadPipeline\n", "RUN: FileReadPipeline\n", "OK : FileReadPipeline\n", "RUN: TFRecordPipeline\n", "OK : TFRecordPipeline\n" ] } ], "source": [ "from __future__ import print_function\n", "import numpy as np\n", "\n", "pipe_types = [[MXNetReaderPipeline, tf.float32, (0, 999)], \n", " [CaffeReadPipeline, tf.int32, (0, 999)],\n", " [FileReadPipeline, tf.int32, (0, 1)], \n", " [TFRecordPipeline, tf.int64, (1, 1000)]]\n", "for pipe_name in pipe_types:\n", " print (\"RUN: \" + pipe_name[0].__name__)\n", " test_batch = get_batch_test_dali(BATCH_SIZE, pipe_name)\n", " x = tf.placeholder(tf.float32, shape=[BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3], name='x')\n", " gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)\n", " config = tf.ConfigProto(gpu_options=gpu_options)\n", "\n", " with tf.Session(config=config) as sess:\n", " for i in range(ITERATIONS):\n", " imgs, labels = sess.run(test_batch)\n", " # Testing correctness of labels\n", " for label in labels:\n", " ## labels need to be integers\n", " assert(np.equal(np.mod(label, 1), 0).all())\n", " ## labels need to be in range pipe_name[2]\n", " assert((label >= pipe_name[2][0]).all())\n", " assert((label <= pipe_name[2][1]).all())\n", " print(\"OK : \" + pipe_name[0].__name__)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.15rc1" } }, "nbformat": 4, "nbformat_minor": 2 }