LaVOZs

The World’s Largest Online Community for Developers

'; java - Dataflow writing a pCollection of GenericRecords to Parquet files - LavOzs.Com

In apache beam step I have a PCollection of KV<String, Iterable<KV<Long, GenericRecord>>>>. I want to write all the records in the iterable to the same parquet file. My code snippet is given below

p.apply(ParDo.of(new MapWithAvroSchemaAndConvertToGenericRecord())) // PCollection<GenericRecord>
.apply(ParDo.of(new MapKafkaGenericRecordValue(formatter, options.getFileNameDelimiter()))) //PCollection<KV<String, KV<Long, GenericRecord>>>
.apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>

now I want to write all the Records in the Iterable in the same parquet file(derive the file name by the key of KV).

I found out the solution to the problem. at the step -

apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>

I will apply another transform that will return only the Iterable as the output pCollection. `.apply(ParDo.of(new GetIterable())) //PCollection>> where key is the name of the file I have to write to. then remaining snippet is

.apply(Flatten.iterables())
                .apply(
                        FileIO.<String, KV<String, GenericRecord>>writeDynamic()
                                .by((SerializableFunction<KV<String, GenericRecord>, String>) KV::getKey)
                                .via(
                                        Contextful.fn(
                                                (SerializableFunction<KV<String, GenericRecord>, GenericRecord>) KV::getValue
                                        ),
                                        ParquetIO.sink(schema)
                                                .withCompressionCodec(CompressionCodecName.SNAPPY)


                                )

                                .withTempDirectory("/tmp/temp-beam")
                                .to(options.getGCSBucketUrl())
                                .withNumShards(1)
                                .withDestinationCoder(StringUtf8Coder.of())
                )
Related
How do I create a file and write to it in Java?
Determine if a pcollection is empty or not
How do I convert table row PCollections to key,value PCollections in Python?
Create PCollection of GCS objects in Google Cloud Dataflow/Apache Beam
Read multiple parquet files using Apache Beam and ParquetIO
How to merge two PCollection KV<> by key?