Tutorials - Etl

Compute MD5

View as Markdown

In this example, we will see how ETL can be used to do something as simple as computing MD5 of the object. We will go over two ways of starting ETL to achieve our goal. Get ready!

Note: ETL is still in development so some steps may not work exactly as written below.

Prerequisites

Prepare ETL

To showcase ETL’s capabilities, we will go over a simple ETL container that computes the MD5 checksum of the object. There are three ways of approaching this problem:

  1. Simplified flow

    In this example, we will be using python3.11v2 runtime. In simplified flow, we are only expected to write a simple transform function, which can look like this (code.py):

    1import hashlib
    2
    3def transform(input_bytes):
    4 md5 = hashlib.md5()
    5 md5.update(input_bytes)
    6 return md5.hexdigest().encode()

    transform function must take bytes as an argument (the object’s content) and return output bytes that will be saved in the transformed object.

    Once we have the transform function defined, we can use CLI to build and initialize ETL:

    1$ ais etl init code --spec=code.py --runtime=python3.11v2 --name=transformer-md5 --comm-type hpull
    2transformer-md5
  2. Simplified flow with input/output Similar to the above example, we will be using the python3.11v2 runtime. However, the python code in this case expects data as standard input and writes the output bytes to standard output, as shown in the following code.py:

    1 import hashlib
    2 import sys
    3
    4 md5 = hashlib.md5()
    5 for chunk in sys.stdin.buffer.read():
    6 md5.update(chunk)
    7 sys.stdout.buffer.write(md5.hexdigest().encode())

    We can now use the CLI to build and initialize ETL with io:// communicator type:

    1$ ais etl init code --spec=code.py --runtime=python3.11v2 --comm-type="io://" --name="compute-md5"
    2compute-md5
  3. Regular flow

    First, we need to write a server. In this case, we will write a Python 3 HTTP server. The code for it can look like this (server.py):

    1#!/usr/bin/env python
    2
    3import argparse
    4import hashlib
    5from http.server import HTTPServer, BaseHTTPRequestHandler
    6
    7
    8class S(BaseHTTPRequestHandler):
    9 def _set_headers(self):
    10 self.send_response(200)
    11 self.send_header("Content-type", "text/plain")
    12 self.end_headers()
    13
    14 def do_POST(self):
    15 content_length = int(self.headers["Content-Length"])
    16 post_data = self.rfile.read(content_length)
    17 md5 = hashlib.md5()
    18 md5.update(post_data)
    19
    20 self._set_headers()
    21 self.wfile.write(md5.hexdigest().encode())
    22
    23
    24def run(server_class=HTTPServer, handler_class=S, addr="localhost", port=8000):
    25 server_address = (addr, port)
    26 httpd = server_class(server_address, handler_class)
    27
    28 print(f"Starting httpd server on {addr}:{port}")
    29 httpd.serve_forever()
    30
    31
    32if __name__ == "__main__":
    33 parser = argparse.ArgumentParser(description="Run a simple HTTP server")
    34 parser.add_argument(
    35 "-l",
    36 "--listen",
    37 default="localhost",
    38 help="Specify the IP address on which the server listens",
    39 )
    40 parser.add_argument(
    41 "-p",
    42 "--port",
    43 type=int,
    44 default=8000,
    45 help="Specify the port on which the server listens",
    46 )
    47 args = parser.parse_args()
    48 run(addr=args.listen, port=args.port)

    Once we have a server that computes the MD5, we need to create an image out of it. For that, we need to write Dockerfile, which can look like this:

    1FROM python:3.8.5-alpine3.11
    2
    3RUN mkdir /code
    4WORKDIR /code
    5COPY server.py server.py
    6
    7EXPOSE 80
    8
    9ENTRYPOINT [ "/code/server.py", "--listen", "0.0.0.0", "--port", "80" ]

    Once we have the docker file, we must build it and publish it to some Docker Registry so that our Kubernetes cluster can pull this image later. In this example, we will use docker.io Docker Registry.

    1$ docker build -t docker.io/aistore/md5_server:v1 .
    2$ docker push docker.io/aistore/md5_server:v1

    The next step is to create spec of a Pod, that will be run on Kubernetes (spec.yaml):

    1apiVersion: v1
    2kind: Pod
    3metadata:
    4 name: transformer-md5
    5spec:
    6 containers:
    7 - name: server
    8 image: docker.io/aistore/md5_server:v1
    9 ports:
    10 - name: default
    11 containerPort: 80
    12 command: ['/code/server.py', '--listen', '0.0.0.0', '--port', '80']

    Important: the server listens on the same port as specified in ports.containerPort. It is required, as a target needs to know the precise socket address of the ETL container.

    Once we have our spec.yaml, we can initialize ETL with CLI:

    1$ ais etl init spec --spec=spec.yaml --name=transformer-md5 --comm-type="hpush://"
    2transformer-md5

Just before we started ETL containers, our Pods looked like this:

1$ kubectl get pods
2NAME READY STATUS RESTARTS AGE
3demo-ais-admin-99p8r 1/1 Running 0 31m
4demo-ais-proxy-5vqb8 1/1 Running 0 31m
5demo-ais-proxy-g7jf7 1/1 Running 0 31m
6demo-ais-target-0 1/1 Running 0 31m
7demo-ais-target-1 1/1 Running 0 29m

We can see that the cluster is running with one proxy and two targets. After we initialized the ETL, we expect two more Pods to be started (#targets == #etl_containers).

1$ kubectl get pods
2NAME READY STATUS RESTARTS AGE
3demo-ais-admin-99p8r 1/1 Running 0 41m
4demo-ais-proxy-5vqb8 1/1 Running 0 41m
5demo-ais-proxy-g7jf7 1/1 Running 0 41m
6demo-ais-target-0 1/1 Running 0 41m
7demo-ais-target-1 1/1 Running 0 39m
8transformer-md5-fgjk3 1/1 Running 0 1m
9transformer-md5-vspra 1/1 Running 0 1m

As expected, two more Pods are up and running - one for each target.

ETL containers will be run on the same node as the targets that started them. In other words, each ETL container runs close to data and does not generate any extract-transform-load related network traffic. Given that there are as many ETL containers as storage nodes (one container per target) and that all ETL containers run in parallel, the cumulative “transformation” bandwidth scales proportionally to the number of storage nodes and disks.

Finally, we can use newly created Pods to transform the objects on the fly for us:

1$ ais create transform
2$ echo "some text :)" | ais put - transform/shard.in
3$ ais etl object transformer-md5 transform/shard.in -
4393c6706efb128fbc442d3f7d084a426

Voilà! The ETL container successfully computed the md5 on the transform/shard.in object.

Alternatively, one can use the offline ETL feature to transform the whole bucket.

1$ ais create transform
2$ echo "some text :)" | ais put - transform/shard.in
3$ ais etl bucket transformer-md5 ais://transform ais://transform-md5 --wait

Once ETL isn’t needed anymore, the Pods can be stopped with:

1$ ais etl stop transformer-md5
2ETL containers stopped successfully.
3$ kubectl get pods
4NAME READY STATUS RESTARTS AGE
5demo-ais-admin-99p8r 1/1 Running 0 50m
6demo-ais-proxy-5vqb8 1/1 Running 0 50m
7demo-ais-proxy-g7jf7 1/1 Running 0 49m
8demo-ais-target-0 1/1 Running 0 50m
9demo-ais-target-1 1/1 Running 0 49m