
Microsoft SQL Server’dan AWS S3 bucket üzerindeki bir klasöre veri aktarımı için öncelikle NiFi arayüzünden Controller Settings bölümünden SQL Server JDBC sürücüsünün eklenmesi gerekir.
Controller Settings

Controller Services sekmesine geçilir ve yeni bir kayıt eklemek için artı simgesine tıklanır.

Açılan Add Controller Service penceresinde arama kutusundan aratılarak DBCPConnectionPool seçilir ve Add butonuna tıklanır.

Kayıt eklendikten sonra ayarlar bölümüne gidilir.

Name kısmından uygun bir ad verilir (ör: MS-SQL) ve Properties sekmesine geçilir.

Bu ekranda sırasıyla aşağıdaki parametrelerin değerleri yazılır:
Database Connection URL
Verinin alınacağı SQL sunucusunun JDBC bağlantı ifadesi. Bu örnekte bir Azure SQL veritabanından alındığı için ifade şunun gibi olacaktır:
jdbc:sqlserver://{sql-server-adı}.database.windows.net:1433;database={veritabanı-adı};user={kullanıcı-adı}@{sql-server-adı};password={şifre};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;
Database Driver Class Name
SQL Server için bu değer com.microsoft.sqlserver.jdbc.SQLServerDriver olmalıdır.
Database Driver Location(s)
Aşağıdaki adresten Microsoft JDBC Driver for SQL Server indirilir:

İndirilen klasör içerisindeki mssql-jdbc-12.2.0.jre11.jar dosyası, sabit bir klasöre kopyalanır. Bu dosyanın tam yolu da bu kısma yazılır.

Database User
Veritabanı kullanıcı adı.
Password
Veritabanına bağlanan kullanıcının şifresi.
Sonrasında OK butonuna tıklanır.
Henüz etkinleştirilmediği için Disabled statüsünde olduğu görülür ve sağ taraftaki yıldırım simgesine tıklanarak etkinleştirilir (Enable).

İşlem tamamlandığında State Enabled olarak değişecek ve simge de Disable şeklinde değişecektir.

Processors
Veriyi aktarma işlemi üç adımda olacak: SQL Server’dan veriyi çekme, Avro dosyasını Parquet formatına dönüştürme ve S3 bucket’a yazma.
SQL Server’dan veriyi çekme (QueryDatabaseTable)
Sol üstteki Processor simgesi tıklanarak boş alana sürüklenip bırakılır.

Açılan Add Processor penceresindeki arama kutusundan aratılarak QueryDatabaseTable seçilir ve Add butonuna tıklanır.

NiFi ekranında oluşan processor üzerine çift tıklanarak ya da sağ tıklayıp Configure seçilerek ayarlar ekranına gelinir.
Uygun bir isim verilip Scheduling sekmesine geçilir. Hangi periyotta çalıştırılmak isteniyorsa Run Schedule kısmında bu periyot belirtilir.

Sonra Properties sekmesine geçilir. Bu ekranda sırasıyla aşağıdaki parametrelerin değerleri yazılır:

- Database Connection Pooling Services: MS-SQL
- Database Type: MS SQL 2012+
- Table Name: Burada örnek olarak dbo.Calendar tablosu kullanılmıştır. Aktarılacak veritabanı tablosu {şema}.{tablo-adı} formatında yazılır.
Apply butonuna tıklanıp processor kaydedilir.
Avro dosyasını Parquet formatına dönüştürme (ConvertAvroToParquet)
NiFi çekilen veriyi Avro formatına dönüştüreceği için, bu örnek özelinde ihtiyacımız olan Parquet formatına dönüştürme işlemini ayrı bir adım olarak eklememiz gerekiyor.
Sol üstteki Processor simgesi tıklanarak boş alana sürüklenip bırakılır. Açılan Add Processor penceresindeki arama kutusundan aratılarak ConvertAvroToParquet seçilir ve Add butonuna tıklanır.

NiFi ekranında oluşan processor üzerine çift tıklanarak ya da sağ tıklayıp Configure seçilerek ayarlar ekranına gelinir.
Uygun bir isim verilip Properties sekmesine geçilir. Sıkıştırma isteniyorsa buradan sıkıştırma biçim seçilebilir. Bu örnek için değer Uncompressed olarak bırakılmıştır.

Relationships sekmesinden de Automatically Terminate / Retry Relationships altındaki terminate kutucuğu işaretlenir.

Apply butonuna tıklanarak processor kaydedilir.
S3 bucket’a yazma (PutS3Object)
Sol üstteki Processor simgesi tıklanarak boş alana sürüklenip bırakılır. Açılan Add Processor penceresindeki arama kutusundan aratılarak PutS3Object seçilir ve Add butonuna tıklanır.

NiFi ekranında oluşan processor üzerine çift tıklanarak ya da sağ tıklayıp Configure seçilerek ayarlar ekranına gelinir.
Uygun bir isim verilip Properties sekmesine geçilir. Bu ekranda sırasıyla aşağıdaki parametrelerin değerleri yazılır:

Object Key
Burada bir önceki çıktıda üretilen ${filename} varsayılan değer olarak görünür. Bu örnek için dosya formatını test_YYYY_MM_DD.parquet olarak yasmak için aşağıdaki expression kalıbı kullanılır:
${now():format('yyyy_MM_dd'):prepend("test_"):append(".parquet")}
Bucket
Oluşan Parquet dosyasının yazılacağı S3 bucket’ının adı (varsa alt klasörler de dahil olarak) yazılır.
Access Key ID
S3 bucket’ında yazma yetkisi olan kullanıcıya ait erişim anahtar ID’si yazılır.

Secret Access Key
S3 bucket’ında yazma yetkisi olan kullanıcıya ait erişim anahtarı yazılır.
Region
S3 bucket’in yer aldığı bölge bilgisi yazılır.

Relationships sekmesinden de Automatically Terminate / Retry Relationships altındaki terminate kutucuğu işaretlenir.

Son olarak oluşturulan processor’lar soldan başlanarak tıklanıp bir sonraki processor üzerine sürükleyip bırakarak birleştirilir.

Akışın nihai görüntüsü aşağıdaki gibi olmalıdır:

Her bir processor’ün üzerine tıklayıp, eğer Stopped durumdaysa Start’a tıklanarak aktif hale getirilir.

Başta belirlenen periyoda göre akış çalışmaya başlayacaktır. Akış sürekli çalışacağıdan bu örnekte aynı dosya tekrar oluşturulup bucket içerisine yazılacaktır.
Oluşan dosyalar bucket içerisinden doğrudan ya da örneğin Python kodu ile control edilebilir.

Örnek Python kodu aşağıdaki gibidir:
import boto3
s3 = boto3.resource(
service_name='s3',
region_name='eu-west-1',
aws_access_key_id='XXXXXXXXXXXXXXXXXXX',
aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
)
for obj in s3.Bucket('test-folder').objects.all():
print(obj)